From e6bf3fe7f81ff7651b3e8be3d530be725090ed2c Mon Sep 17 00:00:00 2001 From: Adrien Ferrand Date: Fri, 26 Jul 2019 00:25:36 +0200 Subject: [PATCH] [Windows] Security model for files permissions - STEP 3f (#7233) * Correct file permissions on TempHandler * Forbid os.chown and os.geteuid, as theses functions can be harmful to the security model on Windows. * Implement copy_ownership * Apply copy_ownership * Correct webroot tests (and activate another broken test !) * Correct lint and mypy * Ensure to apply mode in makedirs * Apply strict permissions on directories created with tempfile.mkdtemp(), like on Unix. * Ensure streamHandler has 0600 on Windows * Reactivate a test on windows * Pin oldest requirements to current internal libraries (acme and certbot) * Add dynamically pywin32 in dependencies: always except for certbot-oldest to avoid to break the relevant tests. * Administrative privileges are always required. * Correct security implementation (not the logic yet) * First correction. Allow to manipulate finely file permissions during their generation * Align to master + fix lint + resolve correctly symbolic links * Add a test for windows about default paths * Strenghthen the detection of Linux/Windows to check the standard files layout. * Fix lint and mypy * Reflect non usage of cache discovery from dns google plugin to its tests, solving Windows tests on the way * Apply suggestions from code review Co-Authored-By: adferrand * Add more details in a comment * Retrigger build. * Add documentation. * Fix a test * Correct RW clear down * Update util.py * Remove unused code * Fix code style * Adapt certbot coverage threshold on Linux due to Windows specific LOC addition. * Various optimizations around file owner and file mode * Fix last error * Fix copy_ownership_and_apply_mode * Fix lint * Correct mypy * Extract out first part from windows-file-permissions * Ignore new_compat in coverage for now * Create test package for compat * Add unit tests for security module. * Add pywin32 * Adapt linux coverages to the windows-specific LOCs added * Clean imports * Correct import * Trigger CI * Reactivate a test * Create the certbot.compat package. Move logic in certbot.compat.misc * Clean comment * Add doc * Fix lint * Correct mypy * Add executable permissions * Add the delegate certbot.compat.os module, add check coding style to enforce usage of certbot.compat.os instead of standard os * Load certbot.compat.os instead of os * Move existing compat test * Update local oldest requirements * Import sys * Fix some mocks * Update account_test.py * Update os.py * Update os.py * Update local oldest requirements * Implement the new linter_plugin * Fix remaining linting errors * Fix local oldest for nginx * Remove custom check in favor of pylint plugin * Remove check coding style * Update linter_plugin.py Co-Authored-By: adferrand * Add several comments * Update the setup.py * Add documentation * Update acme dependencies * Update certbot/compat/os.py Co-Authored-By: adferrand * Update certbot/compat/os.py Co-Authored-By: adferrand * Update certbot/compat/os.py Co-Authored-By: adferrand * Update docs/contributing.rst Co-Authored-By: adferrand * Update linter_plugin.py Co-Authored-By: adferrand * Update linter_plugin.py Co-Authored-By: adferrand * Update docs/contributing.rst Co-Authored-By: adferrand * Update docs/contributing.rst Co-Authored-By: adferrand * Corrections * Handle os.path. Simplify checker. * Add a comment to a reference implementation * Update changelog * Fix module registering * Update docs/contributing.rst Co-Authored-By: adferrand * Update docs/contributing.rst Co-Authored-By: adferrand * Update docs/contributing.rst Co-Authored-By: adferrand * Update config and changelog * Correction * Correct os * Fix merge * Disable pylint checks * Normalize imports * Simplify security * Corrections * Reorganize module * Clean code * Clean code * Remove coverage * No cover * Implement security.chmod * Disable a test for now * Disable hard error for now * Add a first test. Remove unused import * Recalibrate coverage * Modifications for misc * Correct function call * Add some types * Remove newline * Use os_rename * Implement security.open * Revert to windows-files-permissions approach * Fix lint * Implement security.mkdir and security.makedirs * Fix lint * Clean lint * Clean lint * Revert "Clean lint" This reverts commit 83bf81960ac6bf3f76c286ca065a5ac850c6870b. * Correct mock * Conditionally add pywin32 on setuptools versions that support environment markers. * Fix separator * Fix separator * Rename security into filesystem * Change module security to filesystem * Move rename into filesystem * Rename security into filesystem * Rename security into filesystem * Rerun CI * Fix import * Fix pylint * Implement copy_ownership_and_apply_mode * Fix pylint * Update certbot/compat/os.py Co-Authored-By: Brad Warren * Remove default values * Rewrite a comment. * Relaunch CI * Pass as keyword arguments * Update certbot/compat/filesystem.py Co-Authored-By: Brad Warren * Update certbot/compat/filesystem.py Co-Authored-By: Brad Warren * Update certbot/compat/filesystem.py Co-Authored-By: Brad Warren * Make the private key permissions transfer platform specific * Update certbot/compat/filesystem.py Co-Authored-By: Brad Warren * Rename variable * Fix comment0 * Add unit test for copy_ownership_and_apply_mode * Adapt coverage * Implement new methods. * Remove the old method * Reimplement make_or_verify_dir * Finish migration * Start to fix tests * Fix ownership when creating a file with filesystem.open * Fix security on TempHandler * Fix validation path permissions * Fix owner on mkdir * Use a proper workdir for crypto tests * Fix pylint * Adapt coverage * Update storage_test.py * Update util_test.py * Clean code * Update certbot/compat/filesystem.py Co-Authored-By: ohemorange * Add comment * Update certbot/compat/filesystem.py Co-Authored-By: ohemorange * Check permissions * Change test mode * Add unit test for filesystem.check_* functions * Update filesystem_test.py * Better logic for TempHandler * Adapt coverage --- .codecov.yml | 4 +- certbot-nginx/certbot_nginx/configurator.py | 11 +-- certbot/account.py | 7 +- certbot/cert_manager.py | 5 +- certbot/client.py | 5 +- certbot/compat/filesystem.py | 75 +++++++++++++++++++++ certbot/compat/misc.py | 26 ------- certbot/crypto_util.py | 7 +- certbot/log.py | 12 ++-- certbot/main.py | 10 +-- certbot/plugins/webroot.py | 3 +- certbot/plugins/webroot_test.py | 5 +- certbot/reverter.py | 10 +-- certbot/tests/cert_manager_test.py | 30 +++------ certbot/tests/compat/filesystem_test.py | 49 +++++++++++++- certbot/tests/crypto_util_test.py | 10 ++- certbot/tests/log_test.py | 5 +- certbot/tests/main_test.py | 10 ++- certbot/tests/storage_test.py | 19 ++---- certbot/tests/util_test.py | 52 ++------------ certbot/util.py | 30 ++------- 21 files changed, 194 insertions(+), 191 deletions(-) diff --git a/.codecov.yml b/.codecov.yml index 86813de6a..f4d4d1d6c 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -6,13 +6,13 @@ coverage: flags: linux # Fixed target instead of auto set by #7173, can # be removed when flags in Codecov are added back. - target: 97.6 + target: 97.5 threshold: 0.1 base: auto windows: flags: windows # Fixed target instead of auto set by #7173, can # be removed when flags in Codecov are added back. - target: 97.0 + target: 97.2 threshold: 0.1 base: auto diff --git a/certbot-nginx/certbot_nginx/configurator.py b/certbot-nginx/certbot_nginx/configurator.py index e078ad4cb..0c6eabf57 100644 --- a/certbot-nginx/certbot_nginx/configurator.py +++ b/certbot-nginx/certbot_nginx/configurator.py @@ -20,7 +20,6 @@ from certbot import crypto_util from certbot import errors from certbot import interfaces from certbot import util -from certbot.compat import misc from certbot.compat import os from certbot.plugins import common @@ -903,13 +902,9 @@ class NginxConfigurator(common.Installer): have permissions of root. """ - uid = misc.os_geteuid() - util.make_or_verify_dir( - self.config.work_dir, core_constants.CONFIG_DIRS_MODE, uid) - util.make_or_verify_dir( - self.config.backup_dir, core_constants.CONFIG_DIRS_MODE, uid) - util.make_or_verify_dir( - self.config.config_dir, core_constants.CONFIG_DIRS_MODE, uid) + util.make_or_verify_dir(self.config.work_dir, core_constants.CONFIG_DIRS_MODE) + util.make_or_verify_dir(self.config.backup_dir, core_constants.CONFIG_DIRS_MODE) + util.make_or_verify_dir(self.config.config_dir, core_constants.CONFIG_DIRS_MODE) def get_version(self): """Return version of Nginx Server. diff --git a/certbot/account.py b/certbot/account.py index bf5c131db..7a1e2de7a 100644 --- a/certbot/account.py +++ b/certbot/account.py @@ -20,7 +20,6 @@ from certbot import constants from certbot import errors from certbot import interfaces from certbot import util -from certbot.compat import misc from certbot.compat import os logger = logging.getLogger(__name__) @@ -139,8 +138,7 @@ class AccountFileStorage(interfaces.AccountStorage): """ def __init__(self, config): self.config = config - util.make_or_verify_dir(config.accounts_dir, 0o700, misc.os_geteuid(), - self.config.strict_permissions) + util.make_or_verify_dir(config.accounts_dir, 0o700, self.config.strict_permissions) def _account_dir_path(self, account_id): return self._account_dir_path_for_server_path(account_id, self.config.server_path) @@ -322,8 +320,7 @@ class AccountFileStorage(interfaces.AccountStorage): def _save(self, account, acme, regr_only): account_dir_path = self._account_dir_path(account.id) - util.make_or_verify_dir(account_dir_path, 0o700, misc.os_geteuid(), - self.config.strict_permissions) + util.make_or_verify_dir(account_dir_path, 0o700, self.config.strict_permissions) try: with open(self._regr_path(account_dir_path), "w") as regr_file: regr = account.regr diff --git a/certbot/cert_manager.py b/certbot/cert_manager.py index ab929b597..6d6d2e2e6 100644 --- a/certbot/cert_manager.py +++ b/certbot/cert_manager.py @@ -15,7 +15,6 @@ from certbot import interfaces from certbot import ocsp from certbot import storage from certbot import util -from certbot.compat import misc from certbot.compat import os from certbot.display import util as display_util @@ -106,7 +105,7 @@ def lineage_for_certname(cli_config, certname): """Find a lineage object with name certname.""" configs_dir = cli_config.renewal_configs_dir # Verify the directory is there - util.make_or_verify_dir(configs_dir, mode=0o755, uid=misc.os_geteuid()) + util.make_or_verify_dir(configs_dir, mode=0o755) try: renewal_file = storage.renewal_file_for_certname(cli_config, certname) except errors.CertStorageError: @@ -375,7 +374,7 @@ def _search_lineages(cli_config, func, initial_rv, *args): """ configs_dir = cli_config.renewal_configs_dir # Verify the directory is there - util.make_or_verify_dir(configs_dir, mode=0o755, uid=misc.os_geteuid()) + util.make_or_verify_dir(configs_dir, mode=0o755) rv = initial_rv for renewal_file in storage.renewal_conf_files(cli_config): diff --git a/certbot/client.py b/certbot/client.py index 4233a6343..7372d6d9d 100644 --- a/certbot/client.py +++ b/certbot/client.py @@ -30,7 +30,6 @@ from certbot import interfaces from certbot import reverter from certbot import storage from certbot import util -from certbot.compat import misc from certbot.compat import os from certbot.display import enhancements from certbot.display import ops as display_ops @@ -459,9 +458,7 @@ class Client(object): """ for path in cert_path, chain_path, fullchain_path: - util.make_or_verify_dir( - os.path.dirname(path), 0o755, misc.os_geteuid(), - self.config.strict_permissions) + util.make_or_verify_dir(os.path.dirname(path), 0o755, self.config.strict_permissions) cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path) diff --git a/certbot/compat/filesystem.py b/certbot/compat/filesystem.py index e2757eb2f..a38fbe760 100644 --- a/certbot/compat/filesystem.py +++ b/certbot/compat/filesystem.py @@ -77,6 +77,54 @@ def copy_ownership_and_apply_mode(src, dst, mode, copy_user, copy_group): chmod(dst, mode) +def check_mode(file_path, mode): + # type: (str, int) -> bool + """ + Check if the given mode matches the permissions of the given file. + On Linux, will make a direct comparison, on Windows, mode will be compared against + the security model. + :param str file_path: Path of the file + :param int mode: POSIX mode to test + :rtype: bool + :return: True if the POSIX mode matches the file permissions + """ + if POSIX_MODE: + return stat.S_IMODE(os.stat(file_path).st_mode) == mode + + return _check_win_mode(file_path, mode) + + +def check_owner(file_path): + # type: (str) -> bool + """ + Check if given file is owned by current user. + :param str file_path: File path to check + :rtype: bool + :return: True if given file is owned by current user, False otherwise. + """ + if POSIX_MODE: + return os.stat(file_path).st_uid == os.getuid() + + # Get owner sid of the file + security = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION) + user = security.GetSecurityDescriptorOwner() + + # Compare sids + return _get_current_user() == user + + +def check_permissions(file_path, mode): + # type: (str, int) -> bool + """ + Check if given file has the given mode and is owned by current user. + :param str file_path: File path to check + :param int mode: POSIX mode to check + :rtype: bool + :return: True if file has correct mode and owner, False otherwise. + """ + return check_owner(file_path) and check_mode(file_path, mode) + + def open(file_path, flags, mode=0o777): # pylint: disable=redefined-builtin # type: (str, int, int) -> int """ @@ -107,6 +155,10 @@ def open(file_path, flags, mode=0o777): # pylint: disable=redefined-builtin security = attributes.SECURITY_DESCRIPTOR user = _get_current_user() dacl = _generate_dacl(user, mode) + # We set second parameter to 0 (`False`) to say that this security descriptor is + # NOT constructed from a default mechanism, but is explicitly set by the user. + # See https://docs.microsoft.com/en-us/windows/desktop/api/securitybaseapi/nf-securitybaseapi-setsecuritydescriptorowner # pylint: disable=line-too-long + security.SetSecurityDescriptorOwner(user, 0) # We set first parameter to 1 (`True`) to say that this security descriptor contains # a DACL. Otherwise second and third parameters are ignored. # We set third parameter to 0 (`False`) to say that this security descriptor is @@ -177,6 +229,7 @@ def mkdir(file_path, mode=0o777): security = attributes.SECURITY_DESCRIPTOR user = _get_current_user() dacl = _generate_dacl(user, mode) + security.SetSecurityDescriptorOwner(user, False) security.SetSecurityDescriptorDacl(1, dacl, 0) try: @@ -353,6 +406,28 @@ def _generate_windows_flags(rights_desc): return flag +def _check_win_mode(file_path, mode): + # Resolve symbolic links + file_path = realpath(file_path) + # Get current dacl file + security = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION + | win32security.DACL_SECURITY_INFORMATION) + dacl = security.GetSecurityDescriptorDacl() + + # Get current file owner sid + user = security.GetSecurityDescriptorOwner() + + if not dacl: + # No DACL means full control to everyone + # This is not a deterministic permissions set. + return False + + # Calculate the target dacl + ref_dacl = _generate_dacl(user, mode) + + return _compare_dacls(dacl, ref_dacl) + + def _compare_dacls(dacl1, dacl2): """ This method compare the two given DACLs to check if they are identical. diff --git a/certbot/compat/misc.py b/certbot/compat/misc.py index c840c333c..693fceefb 100644 --- a/certbot/compat/misc.py +++ b/certbot/compat/misc.py @@ -42,22 +42,6 @@ def raise_for_non_administrative_windows_rights(): raise errors.Error('Error, certbot must be run on a shell with administrative rights.') -def os_geteuid(): - """ - Get current user uid - - :returns: The current user uid. - :rtype: int - - """ - try: - # Linux specific - return os.geteuid() - except AttributeError: - # Windows specific - return 0 - - def readline_with_timeout(timeout, prompt): # type: (float, str) -> str """ @@ -88,16 +72,6 @@ def readline_with_timeout(timeout, prompt): return sys.stdin.readline() -def compare_file_modes(mode1, mode2): - """Return true if the two modes can be considered as equals for this platform""" - if os.name != 'nt': - # Linux specific: standard compare - return oct(stat.S_IMODE(mode1)) == oct(stat.S_IMODE(mode2)) - # Windows specific: most of mode bits are ignored on Windows. Only check user R/W rights. - return (stat.S_IMODE(mode1) & stat.S_IREAD == stat.S_IMODE(mode2) & stat.S_IREAD - and stat.S_IMODE(mode1) & stat.S_IWRITE == stat.S_IMODE(mode2) & stat.S_IWRITE) - - WINDOWS_DEFAULT_FOLDERS = { 'config': 'C:\\Certbot', 'work': 'C:\\Certbot\\lib', diff --git a/certbot/crypto_util.py b/certbot/crypto_util.py index 281a76668..12291af38 100644 --- a/certbot/crypto_util.py +++ b/certbot/crypto_util.py @@ -28,7 +28,6 @@ from acme.magic_typing import IO # pylint: disable=unused-import, no-name-in-mo from certbot import errors from certbot import interfaces from certbot import util -from certbot.compat import misc from certbot.compat import os logger = logging.getLogger(__name__) @@ -61,8 +60,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"): config = zope.component.getUtility(interfaces.IConfig) # Save file - util.make_or_verify_dir(key_dir, 0o700, misc.os_geteuid(), - config.strict_permissions) + util.make_or_verify_dir(key_dir, 0o700, config.strict_permissions) key_f, key_path = util.unique_file( os.path.join(key_dir, keyname), 0o600, "wb") with key_f: @@ -92,8 +90,7 @@ def init_save_csr(privkey, names, path): privkey.pem, names, must_staple=config.must_staple) # Save CSR - util.make_or_verify_dir(path, 0o755, misc.os_geteuid(), - config.strict_permissions) + util.make_or_verify_dir(path, 0o755, config.strict_permissions) csr_f, csr_filename = util.unique_file( os.path.join(path, "csr-certbot.pem"), 0o644, "wb") with csr_f: diff --git a/certbot/log.py b/certbot/log.py index bf444de07..a16e2ef7e 100644 --- a/certbot/log.py +++ b/certbot/log.py @@ -17,6 +17,7 @@ from __future__ import print_function import functools import logging import logging.handlers +import shutil import sys import tempfile import traceback @@ -26,7 +27,6 @@ from acme import messages from certbot import constants from certbot import errors from certbot import util -from certbot.compat import misc from certbot.compat import os # Logging format @@ -134,8 +134,7 @@ def setup_log_file_handler(config, logfile, fmt): """ # TODO: logs might contain sensitive data such as contents of the # private key! #525 - util.set_up_core_dir( - config.logs_dir, 0o700, misc.os_geteuid(), config.strict_permissions) + util.set_up_core_dir(config.logs_dir, 0o700, config.strict_permissions) log_file_path = os.path.join(config.logs_dir, logfile) try: handler = logging.handlers.RotatingFileHandler( @@ -240,9 +239,10 @@ class TempHandler(logging.StreamHandler): """ def __init__(self): - stream = tempfile.NamedTemporaryFile('w', delete=False) + self._workdir = tempfile.mkdtemp() + self.path = os.path.join(self._workdir, 'log') + stream = util.safe_open(self.path, mode='w', chmod=0o600) super(TempHandler, self).__init__(stream) - self.path = stream.name self._delete = True def emit(self, record): @@ -266,7 +266,7 @@ class TempHandler(logging.StreamHandler): # stream like stderr to be used self.stream.close() if self._delete: - os.remove(self.path) + shutil.rmtree(self._workdir) self._delete = False super(TempHandler, self).close() finally: diff --git a/certbot/main.py b/certbot/main.py index 50470438f..fc91aca5f 100644 --- a/certbot/main.py +++ b/certbot/main.py @@ -1299,18 +1299,14 @@ def make_or_verify_needed_dirs(config): :rtype: None """ - util.set_up_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE, - misc.os_geteuid(), config.strict_permissions) - util.set_up_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE, - misc.os_geteuid(), config.strict_permissions) + util.set_up_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE, config.strict_permissions) + util.set_up_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE, config.strict_permissions) hook_dirs = (config.renewal_pre_hooks_dir, config.renewal_deploy_hooks_dir, config.renewal_post_hooks_dir,) for hook_dir in hook_dirs: - util.make_or_verify_dir(hook_dir, - uid=misc.os_geteuid(), - strict=config.strict_permissions) + util.make_or_verify_dir(hook_dir, strict=config.strict_permissions) def set_displayer(config): diff --git a/certbot/plugins/webroot.py b/certbot/plugins/webroot.py index f767bdf54..33e6530a1 100644 --- a/certbot/plugins/webroot.py +++ b/certbot/plugins/webroot.py @@ -24,6 +24,7 @@ from certbot.display import ops from certbot.display import util as display_util from certbot.plugins import common from certbot.plugins import util +from certbot.util import safe_open logger = logging.getLogger(__name__) @@ -207,7 +208,7 @@ to serve all files under specified web root ({0}).""" old_umask = os.umask(0o022) try: - with open(validation_path, "wb") as validation_file: + with safe_open(validation_path, mode="wb", chmod=0o644) as validation_file: validation_file.write(validation.encode()) finally: os.umask(old_umask) diff --git a/certbot/plugins/webroot_test.py b/certbot/plugins/webroot_test.py index ba9950fae..a0b701cac 100644 --- a/certbot/plugins/webroot_test.py +++ b/certbot/plugins/webroot_test.py @@ -17,7 +17,6 @@ from acme import challenges from certbot import achallenges from certbot import errors -from certbot.compat import misc from certbot.compat import os from certbot.compat import filesystem from certbot.display import util as display_util @@ -168,14 +167,14 @@ class AuthenticatorTest(unittest.TestCase): # Remove exec bit from permission check, so that it # matches the file self.auth.perform([self.achall]) - self.assertTrue(misc.compare_file_modes(os.stat(self.validation_path).st_mode, 0o644)) + self.assertTrue(filesystem.check_mode(self.validation_path, 0o644)) # Check permissions of the directories for dirpath, dirnames, _ in os.walk(self.path): for directory in dirnames: full_path = os.path.join(dirpath, directory) - self.assertTrue(misc.compare_file_modes(os.stat(full_path).st_mode, 0o755)) + self.assertTrue(filesystem.check_mode(full_path, 0o755)) parent_gid = os.stat(self.path).st_gid parent_uid = os.stat(self.path).st_uid diff --git a/certbot/reverter.py b/certbot/reverter.py index 21d33806a..f4f1c8c67 100644 --- a/certbot/reverter.py +++ b/certbot/reverter.py @@ -15,7 +15,6 @@ from certbot import constants from certbot import errors from certbot import interfaces from certbot import util -from certbot.compat import misc from certbot.compat import os from certbot.compat import filesystem @@ -68,8 +67,7 @@ class Reverter(object): self.config = config util.make_or_verify_dir( - config.backup_dir, constants.CONFIG_DIRS_MODE, misc.os_geteuid(), - self.config.strict_permissions) + config.backup_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions) def revert_temporary_config(self): """Reload users original configuration files after a temporary save. @@ -225,8 +223,7 @@ class Reverter(object): """ util.make_or_verify_dir( - cp_dir, constants.CONFIG_DIRS_MODE, misc.os_geteuid(), - self.config.strict_permissions) + cp_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions) op_fd, existing_filepaths = self._read_and_append( os.path.join(cp_dir, "FILEPATHS")) @@ -445,8 +442,7 @@ class Reverter(object): cp_dir = self.config.in_progress_dir util.make_or_verify_dir( - cp_dir, constants.CONFIG_DIRS_MODE, misc.os_geteuid(), - self.config.strict_permissions) + cp_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions) return cp_dir diff --git a/certbot/tests/cert_manager_test.py b/certbot/tests/cert_manager_test.py index b4cd946ee..83946a0f7 100644 --- a/certbot/tests/cert_manager_test.py +++ b/certbot/tests/cert_manager_test.py @@ -277,13 +277,12 @@ class SearchLineagesTest(BaseCertManagerTest): @mock.patch('certbot.storage.renewal_conf_files') @mock.patch('certbot.storage.RenewableCert') def test_cert_storage_error(self, mock_renewable_cert, mock_renewal_conf_files, - mock_make_or_verify_dir): + mock_make_or_verify_dir): mock_renewal_conf_files.return_value = ["badfile"] mock_renewable_cert.side_effect = errors.CertStorageError from certbot import cert_manager # pylint: disable=protected-access - self.assertEqual(cert_manager._search_lineages(self.config, lambda x: x, "check"), - "check") + self.assertEqual(cert_manager._search_lineages(self.config, lambda x: x, "check"), "check") self.assertTrue(mock_make_or_verify_dir.called) @@ -294,33 +293,28 @@ class LineageForCertnameTest(BaseCertManagerTest): @mock.patch('certbot.storage.renewal_file_for_certname') @mock.patch('certbot.storage.RenewableCert') def test_found_match(self, mock_renewable_cert, mock_renewal_conf_file, - mock_make_or_verify_dir): + mock_make_or_verify_dir): mock_renewal_conf_file.return_value = "somefile.conf" mock_match = mock.Mock(lineagename="example.com") mock_renewable_cert.return_value = mock_match from certbot import cert_manager - self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), - mock_match) + self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), mock_match) self.assertTrue(mock_make_or_verify_dir.called) @mock.patch('certbot.util.make_or_verify_dir') @mock.patch('certbot.storage.renewal_file_for_certname') - def test_no_match(self, mock_renewal_conf_file, - mock_make_or_verify_dir): + def test_no_match(self, mock_renewal_conf_file, mock_make_or_verify_dir): mock_renewal_conf_file.return_value = "other.com.conf" from certbot import cert_manager - self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), - None) + self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), None) self.assertTrue(mock_make_or_verify_dir.called) @mock.patch('certbot.util.make_or_verify_dir') @mock.patch('certbot.storage.renewal_file_for_certname') - def test_no_renewal_file(self, mock_renewal_conf_file, - mock_make_or_verify_dir): + def test_no_renewal_file(self, mock_renewal_conf_file, mock_make_or_verify_dir): mock_renewal_conf_file.side_effect = errors.CertStorageError() from certbot import cert_manager - self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), - None) + self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), None) self.assertTrue(mock_make_or_verify_dir.called) @@ -331,7 +325,7 @@ class DomainsForCertnameTest(BaseCertManagerTest): @mock.patch('certbot.storage.renewal_file_for_certname') @mock.patch('certbot.storage.RenewableCert') def test_found_match(self, mock_renewable_cert, mock_renewal_conf_file, - mock_make_or_verify_dir): + mock_make_or_verify_dir): mock_renewal_conf_file.return_value = "somefile.conf" mock_match = mock.Mock(lineagename="example.com") domains = ["example.com", "example.org"] @@ -344,12 +338,10 @@ class DomainsForCertnameTest(BaseCertManagerTest): @mock.patch('certbot.util.make_or_verify_dir') @mock.patch('certbot.storage.renewal_file_for_certname') - def test_no_match(self, mock_renewal_conf_file, - mock_make_or_verify_dir): + def test_no_match(self, mock_renewal_conf_file, mock_make_or_verify_dir): mock_renewal_conf_file.return_value = "somefile.conf" from certbot import cert_manager - self.assertEqual(cert_manager.domains_for_certname(self.config, "other.com"), - None) + self.assertEqual(cert_manager.domains_for_certname(self.config, "other.com"), None) self.assertTrue(mock_make_or_verify_dir.called) diff --git a/certbot/tests/compat/filesystem_test.py b/certbot/tests/compat/filesystem_test.py index 48ac04670..90ecd13d7 100644 --- a/certbot/tests/compat/filesystem_test.py +++ b/certbot/tests/compat/filesystem_test.py @@ -16,6 +16,7 @@ except ImportError: import certbot.tests.util as test_util from certbot import lock +from certbot import util from certbot.compat import os from certbot.compat import filesystem from certbot.tests.util import TempDirTestCase @@ -306,6 +307,52 @@ class CopyOwnershipTest(test_util.TempDirTestCase): mock_chmod.assert_called_once_with(self.probe_path, 0o700) +class CheckPermissionsTest(test_util.TempDirTestCase): + def setUp(self): + super(CheckPermissionsTest, self).setUp() + self.probe_path = _create_probe(self.tempdir) + + def test_check_mode(self): + self.assertTrue(filesystem.check_mode(self.probe_path, 0o744)) + + filesystem.chmod(self.probe_path, 0o700) + self.assertFalse(filesystem.check_mode(self.probe_path, 0o744)) + + @unittest.skipIf(POSIX_MODE, reason='Test specific to Windows security') + def test_check_owner_windows(self): + self.assertTrue(filesystem.check_owner(self.probe_path)) + + system = win32security.ConvertStringSidToSid(SYSTEM_SID) + security = win32security.SECURITY_ATTRIBUTES().SECURITY_DESCRIPTOR + security.SetSecurityDescriptorOwner(system, False) + + with mock.patch('win32security.GetFileSecurity') as mock_get: + mock_get.return_value = security + self.assertFalse(filesystem.check_owner(self.probe_path)) + + @unittest.skipUnless(POSIX_MODE, reason='Test specific to Linux security') + def test_check_owner_linux(self): + self.assertTrue(filesystem.check_owner(self.probe_path)) + + import os as std_os # pylint: disable=os-module-forbidden + uid = std_os.getuid() + + with mock.patch('os.getuid') as mock_uid: + mock_uid.return_value = uid + 1 + self.assertFalse(filesystem.check_owner(self.probe_path)) + + def test_check_permissions(self): + self.assertTrue(filesystem.check_permissions(self.probe_path, 0o744)) + + with mock.patch('certbot.compat.filesystem.check_mode') as mock_mode: + mock_mode.return_value = False + self.assertFalse(filesystem.check_permissions(self.probe_path, 0o744)) + + with mock.patch('certbot.compat.filesystem.check_owner') as mock_owner: + mock_owner.return_value = False + self.assertFalse(filesystem.check_permissions(self.probe_path, 0o744)) + + class OsReplaceTest(test_util.TempDirTestCase): """Test to ensure consistent behavior of rename method""" def test_os_replace_to_existing_file(self): @@ -381,7 +428,7 @@ def _set_owner(target, security_owner, user): def _create_probe(tempdir): filesystem.chmod(tempdir, 0o744) probe_path = os.path.join(tempdir, 'probe') - open(probe_path, 'w').close() + util.safe_open(probe_path, 'w', chmod=0o744).close() return probe_path diff --git a/certbot/tests/crypto_util_test.py b/certbot/tests/crypto_util_test.py index 5c1a07f8d..2673f4219 100644 --- a/certbot/tests/crypto_util_test.py +++ b/certbot/tests/crypto_util_test.py @@ -11,6 +11,7 @@ from certbot import errors from certbot import interfaces from certbot import util from certbot.compat import os +from certbot.compat import filesystem RSA256_KEY = test_util.load_vector('rsa256_key.pem') RSA256_KEY_PATH = test_util.vector_path('rsa256_key.pem') @@ -29,6 +30,9 @@ class InitSaveKeyTest(test_util.TempDirTestCase): def setUp(self): super(InitSaveKeyTest, self).setUp() + self.workdir = os.path.join(self.tempdir, 'workdir') + filesystem.mkdir(self.workdir, mode=0o700) + logging.disable(logging.CRITICAL) zope.component.provideUtility( mock.Mock(strict_permissions=True), interfaces.IConfig) @@ -46,15 +50,15 @@ class InitSaveKeyTest(test_util.TempDirTestCase): @mock.patch('certbot.crypto_util.make_key') def test_success(self, mock_make): mock_make.return_value = b'key_pem' - key = self._call(1024, self.tempdir) + key = self._call(1024, self.workdir) self.assertEqual(key.pem, b'key_pem') self.assertTrue('key-certbot.pem' in key.file) - self.assertTrue(os.path.exists(os.path.join(self.tempdir, key.file))) + self.assertTrue(os.path.exists(os.path.join(self.workdir, key.file))) @mock.patch('certbot.crypto_util.make_key') def test_key_failure(self, mock_make): mock_make.side_effect = ValueError - self.assertRaises(ValueError, self._call, 431, self.tempdir) + self.assertRaises(ValueError, self._call, 431, self.workdir) class InitSaveCSRTest(test_util.TempDirTestCase): diff --git a/certbot/tests/log_test.py b/certbot/tests/log_test.py index d203635a2..807c7ecaa 100644 --- a/certbot/tests/log_test.py +++ b/certbot/tests/log_test.py @@ -14,7 +14,7 @@ from acme.magic_typing import Optional # pylint: disable=unused-import, no-name from certbot import constants from certbot import errors from certbot import util -from certbot.compat import misc +from certbot.compat import filesystem from certbot.compat import os from certbot.tests import util as test_util @@ -260,8 +260,7 @@ class TempHandlerTest(unittest.TestCase): self.handler.close() def test_permissions(self): - self.assertTrue( - util.check_permissions(self.handler.path, 0o600, misc.os_geteuid())) + self.assertTrue(filesystem.check_permissions(self.handler.path, 0o600)) def test_delete(self): self.handler.close() diff --git a/certbot/tests/main_test.py b/certbot/tests/main_test.py index ff8a800ab..b7477e355 100644 --- a/certbot/tests/main_test.py +++ b/certbot/tests/main_test.py @@ -31,7 +31,6 @@ from certbot import interfaces # pylint: disable=unused-import from certbot import main from certbot import updater from certbot import util -from certbot.compat import misc from certbot.compat import os from certbot.compat import filesystem from certbot.plugins import disco @@ -809,9 +808,9 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met ifaces = [] # type: List[interfaces.IPlugin] plugins = mock_disco.PluginsRegistry.find_all() - def throw_error(directory, mode, uid, strict): + def throw_error(directory, mode, strict): """Raises error.Error.""" - _, _, _, _ = directory, mode, uid, strict + _, _, _ = directory, mode, strict raise errors.Error() stdout = six.StringIO() @@ -1593,7 +1592,7 @@ class MakeOrVerifyNeededDirs(test_util.ConfigTestCase): for core_dir in (self.config.config_dir, self.config.work_dir,): mock_util.set_up_core_dir.assert_any_call( core_dir, constants.CONFIG_DIRS_MODE, - misc.os_geteuid(), self.config.strict_permissions + self.config.strict_permissions ) hook_dirs = (self.config.renewal_pre_hooks_dir, @@ -1602,8 +1601,7 @@ class MakeOrVerifyNeededDirs(test_util.ConfigTestCase): for hook_dir in hook_dirs: # default mode of 755 is used mock_util.make_or_verify_dir.assert_any_call( - hook_dir, uid=misc.os_geteuid(), - strict=self.config.strict_permissions) + hook_dir, strict=self.config.strict_permissions) class EnhanceTest(test_util.ConfigTestCase): diff --git a/certbot/tests/storage_test.py b/certbot/tests/storage_test.py index fceef804a..2bc45873c 100644 --- a/certbot/tests/storage_test.py +++ b/certbot/tests/storage_test.py @@ -13,7 +13,6 @@ import six import certbot import certbot.tests.util as test_util from certbot import errors -from certbot.compat import misc from certbot.compat import os from certbot.compat import filesystem from certbot.storage import ALL_FOUR @@ -21,7 +20,6 @@ from certbot.storage import ALL_FOUR CERT = test_util.load_cert('cert_512.pem') - def unlink_all(rc_object): """Unlink all four items associated with this RenewableCert.""" for kind in ALL_FOUR: @@ -498,7 +496,6 @@ class RenewableCertTests(BaseRenewableCertTest): self.assertTrue(self.test_rc.should_autorenew()) mock_ocsp.return_value = False - @test_util.broken_on_windows @mock.patch("certbot.storage.relevant_values") def test_save_successor(self, mock_rv): # Mock relevant_values() to claim that all values are relevant here @@ -562,7 +559,7 @@ class RenewableCertTests(BaseRenewableCertTest): self.assertFalse(os.path.islink(self.test_rc.version("privkey", 10))) self.assertFalse(os.path.exists(temp_config_file)) - @test_util.broken_on_windows + @test_util.skip_on_windows('Group/everybody permissions are not maintained on Windows.') @mock.patch("certbot.storage.relevant_values") def test_save_successor_maintains_group_mode(self, mock_rv): # Mock relevant_values() to claim that all values are relevant here @@ -571,22 +568,18 @@ class RenewableCertTests(BaseRenewableCertTest): for kind in ALL_FOUR: self._write_out_kind(kind, 1) self.test_rc.update_all_links_to(1) - self.assertTrue(misc.compare_file_modes( - os.stat(self.test_rc.version("privkey", 1)).st_mode, 0o600)) + self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 1), 0o600)) filesystem.chmod(self.test_rc.version("privkey", 1), 0o444) # If no new key, permissions should be the same (we didn't write any keys) self.test_rc.save_successor(1, b"newcert", None, b"new chain", self.config) - self.assertTrue(misc.compare_file_modes( - os.stat(self.test_rc.version("privkey", 2)).st_mode, 0o444)) + self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 2), 0o444)) # If new key, permissions should be kept as 644 self.test_rc.save_successor(2, b"newcert", b"new_privkey", b"new chain", self.config) - self.assertTrue(misc.compare_file_modes( - os.stat(self.test_rc.version("privkey", 3)).st_mode, 0o644)) + self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 3), 0o644)) # If permissions reverted, next renewal will also revert permissions of new key filesystem.chmod(self.test_rc.version("privkey", 3), 0o400) self.test_rc.save_successor(3, b"newcert", b"new_privkey", b"new chain", self.config) - self.assertTrue(misc.compare_file_modes( - os.stat(self.test_rc.version("privkey", 4)).st_mode, 0o600)) + self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 4), 0o600)) @mock.patch("certbot.storage.relevant_values") @mock.patch("certbot.storage.filesystem.copy_ownership_and_apply_mode") @@ -622,7 +615,7 @@ class RenewableCertTests(BaseRenewableCertTest): self.config.live_dir, "README"))) self.assertTrue(os.path.exists(os.path.join( self.config.live_dir, "the-lineage.com", "README"))) - self.assertTrue(misc.compare_file_modes(os.stat(result.key_path).st_mode, 0o600)) + self.assertTrue(filesystem.check_mode(result.key_path, 0o600)) with open(result.fullchain, "rb") as f: self.assertEqual(f.read(), b"cert" + b"chain") # Let's do it again and make sure it makes a different lineage diff --git a/certbot/tests/util_test.py b/certbot/tests/util_test.py index b96f8afd2..e65de3349 100644 --- a/certbot/tests/util_test.py +++ b/certbot/tests/util_test.py @@ -9,7 +9,6 @@ from six.moves import reload_module # pylint: disable=import-error import certbot.tests.util as test_util from certbot import errors -from certbot.compat import misc from certbot.compat import os from certbot.compat import filesystem @@ -120,15 +119,14 @@ class SetUpCoreDirTest(test_util.TempDirTestCase): @mock.patch('certbot.util.lock_dir_until_exit') def test_success(self, mock_lock): new_dir = os.path.join(self.tempdir, 'new') - self._call(new_dir, 0o700, misc.os_geteuid(), False) + self._call(new_dir, 0o700, False) self.assertTrue(os.path.exists(new_dir)) self.assertEqual(mock_lock.call_count, 1) @mock.patch('certbot.util.make_or_verify_dir') def test_failure(self, mock_make_or_verify): mock_make_or_verify.side_effect = OSError - self.assertRaises(errors.Error, self._call, - self.tempdir, 0o700, misc.os_geteuid(), False) + self.assertRaises(errors.Error, self._call, self.tempdir, 0o700, False) class MakeOrVerifyDirTest(test_util.TempDirTestCase): @@ -145,23 +143,20 @@ class MakeOrVerifyDirTest(test_util.TempDirTestCase): self.path = os.path.join(self.tempdir, "foo") filesystem.mkdir(self.path, 0o600) - self.uid = misc.os_geteuid() - def _call(self, directory, mode): from certbot.util import make_or_verify_dir - return make_or_verify_dir(directory, mode, self.uid, strict=True) + return make_or_verify_dir(directory, mode, strict=True) def test_creates_dir_when_missing(self): path = os.path.join(self.tempdir, "bar") self._call(path, 0o650) self.assertTrue(os.path.isdir(path)) - self.assertTrue(misc.compare_file_modes(os.stat(path).st_mode, 0o650)) + self.assertTrue(filesystem.check_mode(path, 0o650)) def test_existing_correct_mode_does_not_fail(self): self._call(self.path, 0o600) - self.assertTrue(misc.compare_file_modes(os.stat(self.path).st_mode, 0o600)) + self.assertTrue(filesystem.check_mode(self.path, 0o600)) - @test_util.skip_on_windows('Umask modes are mostly ignored on Windows.') def test_existing_wrong_mode_fails(self): self.assertRaises(errors.Error, self._call, self.path, 0o400) @@ -171,39 +166,6 @@ class MakeOrVerifyDirTest(test_util.TempDirTestCase): self.assertRaises(OSError, self._call, "bar", 12312312) -class CheckPermissionsTest(test_util.TempDirTestCase): - """Tests for certbot.util.check_permissions. - - Note that it is not possible to test for a wrong file owner, - as this testing script would have to be run as root. - - """ - - def setUp(self): - super(CheckPermissionsTest, self).setUp() - - self.uid = misc.os_geteuid() - - def _call(self, mode): - from certbot.util import check_permissions - return check_permissions(self.tempdir, mode, self.uid) - - def test_ok_mode(self): - filesystem.chmod(self.tempdir, 0o600) - self.assertTrue(self._call(0o600)) - - # TODO: reactivate the test when all logic from windows file permissions is merged. - @test_util.broken_on_windows - def test_wrong_mode(self): - filesystem.chmod(self.tempdir, 0o400) - try: - self.assertFalse(self._call(0o600)) - finally: - # Without proper write permissions, Windows is unable to delete a folder, - # even with admin permissions. Write access must be explicitly set first. - filesystem.chmod(self.tempdir, 0o700) - - class UniqueFileTest(test_util.TempDirTestCase): """Tests for certbot.util.unique_file.""" @@ -226,8 +188,8 @@ class UniqueFileTest(test_util.TempDirTestCase): def test_right_mode(self): fd1, name1 = self._call(0o700) fd2, name2 = self._call(0o600) - self.assertTrue(misc.compare_file_modes(0o700, os.stat(name1).st_mode)) - self.assertTrue(misc.compare_file_modes(0o600, os.stat(name2).st_mode)) + self.assertTrue(filesystem.check_mode(name1, 0o700)) + self.assertTrue(filesystem.check_mode(name2, 0o600)) fd1.close() fd2.close() diff --git a/certbot/util.py b/certbot/util.py index dfdc87346..8f553e51a 100644 --- a/certbot/util.py +++ b/certbot/util.py @@ -21,7 +21,6 @@ from acme.magic_typing import Tuple, Union # pylint: disable=unused-import, no- from certbot import constants from certbot import errors from certbot import lock -from certbot.compat import misc from certbot.compat import os from certbot.compat import filesystem @@ -144,12 +143,11 @@ def _release_locks(): _LOCKS.clear() -def set_up_core_dir(directory, mode, uid, strict): +def set_up_core_dir(directory, mode, strict): """Ensure directory exists with proper permissions and is locked. :param str directory: Path to a directory. :param int mode: Directory mode. - :param int uid: Directory owner. :param bool strict: require directory to be owned by current user :raises .errors.LockError: if the directory cannot be locked @@ -157,19 +155,18 @@ def set_up_core_dir(directory, mode, uid, strict): """ try: - make_or_verify_dir(directory, mode, uid, strict) + make_or_verify_dir(directory, mode, strict) lock_dir_until_exit(directory) except OSError as error: logger.debug("Exception was:", exc_info=True) raise errors.Error(PERM_ERR_FMT.format(error)) -def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False): +def make_or_verify_dir(directory, mode=0o755, strict=False): """Make sure directory exists with proper permissions. :param str directory: Path to a directory. :param int mode: Directory mode. - :param int uid: Directory owner. :param bool strict: require directory to be owned by current user :raises .errors.Error: if a directory already exists, @@ -184,29 +181,14 @@ def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False): filesystem.makedirs(directory, mode) except OSError as exception: if exception.errno == errno.EEXIST: - if strict and not check_permissions(directory, mode, uid): + if strict and not filesystem.check_permissions(directory, mode): raise errors.Error( - "%s exists, but it should be owned by user %d with" - "permissions %s" % (directory, uid, oct(mode))) + "%s exists, but it should be owned by current user with" + " permissions %s" % (directory, oct(mode))) else: raise -def check_permissions(filepath, mode, uid=0): - """Check file or directory permissions. - - :param str filepath: Path to the tested file (or directory). - :param int mode: Expected file mode. - :param int uid: Expected file owner. - - :returns: True if `mode` and `uid` match, False otherwise. - :rtype: bool - - """ - file_stat = os.stat(filepath) - return misc.compare_file_modes(file_stat.st_mode, mode) and file_stat.st_uid == uid - - def safe_open(path, mode="w", chmod=None): """Safely open a file.