[Windows] Security model for files permissions - STEP 3f (#7233)

* Correct file permissions on TempHandler

* Forbid os.chown and os.geteuid, as theses functions can be harmful to the security model on Windows.

* Implement copy_ownership

* Apply copy_ownership

* Correct webroot tests (and activate another broken test !)

* Correct lint and mypy

* Ensure to apply mode in makedirs

* Apply strict permissions on directories created with tempfile.mkdtemp(), like on Unix.

* Ensure streamHandler has 0600 on Windows

* Reactivate a test on windows

* Pin oldest requirements to current internal libraries (acme and certbot)

* Add dynamically pywin32 in dependencies: always except for certbot-oldest to avoid to break the relevant tests.

* Administrative privileges are always required.

* Correct security implementation (not the logic yet)

* First correction. Allow to manipulate finely file permissions during their generation

* Align to master + fix lint + resolve correctly symbolic links

* Add a test for windows about default paths

* Strenghthen the detection of Linux/Windows to check the standard files layout.

* Fix lint and mypy

* Reflect non usage of cache discovery from dns google plugin to its tests, solving Windows tests on the way

* Apply suggestions from code review

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Add more details in a comment

* Retrigger build.

* Add documentation.

* Fix a test

* Correct RW clear down

* Update util.py

* Remove unused code

* Fix code style

* Adapt certbot coverage threshold on Linux due to Windows specific LOC addition.

* Various optimizations around file owner and file mode

* Fix last error

* Fix copy_ownership_and_apply_mode

* Fix lint

* Correct mypy

* Extract out first part from windows-file-permissions

* Ignore new_compat in coverage for now

* Create test package for compat

* Add unit tests for security module.

* Add pywin32

* Adapt linux coverages to the windows-specific LOCs added

* Clean imports

* Correct import

* Trigger CI

* Reactivate a test

* Create the certbot.compat package. Move logic in certbot.compat.misc

* Clean comment

* Add doc

* Fix lint

* Correct mypy

* Add executable permissions

* Add the delegate certbot.compat.os module, add check coding style to enforce usage of certbot.compat.os instead of standard os

* Load certbot.compat.os instead of os

* Move existing compat test

* Update local oldest requirements

* Import sys

* Fix some mocks

* Update account_test.py

* Update os.py

* Update os.py

* Update local oldest requirements

* Implement the new linter_plugin

* Fix remaining linting errors

* Fix local oldest for nginx

* Remove custom check in favor of pylint plugin

* Remove check coding style

* Update linter_plugin.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Add several comments

* Update the setup.py

* Add documentation

* Update acme dependencies

* Update certbot/compat/os.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot/compat/os.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot/compat/os.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update docs/contributing.rst

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update linter_plugin.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update linter_plugin.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update docs/contributing.rst

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update docs/contributing.rst

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Corrections

* Handle os.path. Simplify checker.

* Add a comment to a reference implementation

* Update changelog

* Fix module registering

* Update docs/contributing.rst

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update docs/contributing.rst

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update docs/contributing.rst

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update config and changelog

* Correction

* Correct os

* Fix merge

* Disable pylint checks

* Normalize imports

* Simplify security

* Corrections

* Reorganize module

* Clean code

* Clean code

* Remove coverage

* No cover

* Implement security.chmod

* Disable a test for now

* Disable hard error for now

* Add a first test. Remove unused import

* Recalibrate coverage

* Modifications for misc

* Correct function call

* Add some types

* Remove newline

* Use os_rename

* Implement security.open

* Revert to windows-files-permissions approach

* Fix lint

* Implement security.mkdir and security.makedirs

* Fix lint

* Clean lint

* Clean lint

* Revert "Clean lint"

This reverts commit 83bf81960a.

* Correct mock

* Conditionally add pywin32 on setuptools versions that support environment markers.

* Fix separator

* Fix separator

* Rename security into filesystem

* Change module security to filesystem

* Move rename into filesystem

* Rename security into filesystem

* Rename security into filesystem

* Rerun CI

* Fix import

* Fix pylint

* Implement copy_ownership_and_apply_mode

* Fix pylint

* Update certbot/compat/os.py

Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>

* Remove default values

* Rewrite a comment.

* Relaunch CI

* Pass as keyword arguments

* Update certbot/compat/filesystem.py

Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>

* Update certbot/compat/filesystem.py

Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>

* Update certbot/compat/filesystem.py

Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>

* Make the private key permissions transfer platform specific

* Update certbot/compat/filesystem.py

Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>

* Rename variable

* Fix comment0

* Add unit test for copy_ownership_and_apply_mode

* Adapt coverage

* Implement new methods.

* Remove the old method

* Reimplement make_or_verify_dir

* Finish migration

* Start to fix tests

* Fix ownership when creating a file with filesystem.open

* Fix security on TempHandler

* Fix validation path permissions

* Fix owner on mkdir

* Use a proper workdir for crypto tests

* Fix pylint

* Adapt coverage

* Update storage_test.py

* Update util_test.py

* Clean code

* Update certbot/compat/filesystem.py

Co-Authored-By: ohemorange <ebportnoy@gmail.com>

* Add comment

* Update certbot/compat/filesystem.py

Co-Authored-By: ohemorange <ebportnoy@gmail.com>

* Check permissions

* Change test mode

* Add unit test for filesystem.check_* functions

* Update filesystem_test.py

* Better logic for TempHandler

* Adapt coverage
This commit is contained in:
Adrien Ferrand 2019-07-26 00:25:36 +02:00 committed by GitHub
parent 40da709792
commit e6bf3fe7f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 194 additions and 191 deletions

View file

@ -6,13 +6,13 @@ coverage:
flags: linux
# Fixed target instead of auto set by #7173, can
# be removed when flags in Codecov are added back.
target: 97.6
target: 97.5
threshold: 0.1
base: auto
windows:
flags: windows
# Fixed target instead of auto set by #7173, can
# be removed when flags in Codecov are added back.
target: 97.0
target: 97.2
threshold: 0.1
base: auto

View file

@ -20,7 +20,6 @@ from certbot import crypto_util
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import misc
from certbot.compat import os
from certbot.plugins import common
@ -903,13 +902,9 @@ class NginxConfigurator(common.Installer):
have permissions of root.
"""
uid = misc.os_geteuid()
util.make_or_verify_dir(
self.config.work_dir, core_constants.CONFIG_DIRS_MODE, uid)
util.make_or_verify_dir(
self.config.backup_dir, core_constants.CONFIG_DIRS_MODE, uid)
util.make_or_verify_dir(
self.config.config_dir, core_constants.CONFIG_DIRS_MODE, uid)
util.make_or_verify_dir(self.config.work_dir, core_constants.CONFIG_DIRS_MODE)
util.make_or_verify_dir(self.config.backup_dir, core_constants.CONFIG_DIRS_MODE)
util.make_or_verify_dir(self.config.config_dir, core_constants.CONFIG_DIRS_MODE)
def get_version(self):
"""Return version of Nginx Server.

View file

@ -20,7 +20,6 @@ from certbot import constants
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import misc
from certbot.compat import os
logger = logging.getLogger(__name__)
@ -139,8 +138,7 @@ class AccountFileStorage(interfaces.AccountStorage):
"""
def __init__(self, config):
self.config = config
util.make_or_verify_dir(config.accounts_dir, 0o700, misc.os_geteuid(),
self.config.strict_permissions)
util.make_or_verify_dir(config.accounts_dir, 0o700, self.config.strict_permissions)
def _account_dir_path(self, account_id):
return self._account_dir_path_for_server_path(account_id, self.config.server_path)
@ -322,8 +320,7 @@ class AccountFileStorage(interfaces.AccountStorage):
def _save(self, account, acme, regr_only):
account_dir_path = self._account_dir_path(account.id)
util.make_or_verify_dir(account_dir_path, 0o700, misc.os_geteuid(),
self.config.strict_permissions)
util.make_or_verify_dir(account_dir_path, 0o700, self.config.strict_permissions)
try:
with open(self._regr_path(account_dir_path), "w") as regr_file:
regr = account.regr

View file

@ -15,7 +15,6 @@ from certbot import interfaces
from certbot import ocsp
from certbot import storage
from certbot import util
from certbot.compat import misc
from certbot.compat import os
from certbot.display import util as display_util
@ -106,7 +105,7 @@ def lineage_for_certname(cli_config, certname):
"""Find a lineage object with name certname."""
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
util.make_or_verify_dir(configs_dir, mode=0o755, uid=misc.os_geteuid())
util.make_or_verify_dir(configs_dir, mode=0o755)
try:
renewal_file = storage.renewal_file_for_certname(cli_config, certname)
except errors.CertStorageError:
@ -375,7 +374,7 @@ def _search_lineages(cli_config, func, initial_rv, *args):
"""
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
util.make_or_verify_dir(configs_dir, mode=0o755, uid=misc.os_geteuid())
util.make_or_verify_dir(configs_dir, mode=0o755)
rv = initial_rv
for renewal_file in storage.renewal_conf_files(cli_config):

View file

@ -30,7 +30,6 @@ from certbot import interfaces
from certbot import reverter
from certbot import storage
from certbot import util
from certbot.compat import misc
from certbot.compat import os
from certbot.display import enhancements
from certbot.display import ops as display_ops
@ -459,9 +458,7 @@ class Client(object):
"""
for path in cert_path, chain_path, fullchain_path:
util.make_or_verify_dir(
os.path.dirname(path), 0o755, misc.os_geteuid(),
self.config.strict_permissions)
util.make_or_verify_dir(os.path.dirname(path), 0o755, self.config.strict_permissions)
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)

View file

@ -77,6 +77,54 @@ def copy_ownership_and_apply_mode(src, dst, mode, copy_user, copy_group):
chmod(dst, mode)
def check_mode(file_path, mode):
# type: (str, int) -> bool
"""
Check if the given mode matches the permissions of the given file.
On Linux, will make a direct comparison, on Windows, mode will be compared against
the security model.
:param str file_path: Path of the file
:param int mode: POSIX mode to test
:rtype: bool
:return: True if the POSIX mode matches the file permissions
"""
if POSIX_MODE:
return stat.S_IMODE(os.stat(file_path).st_mode) == mode
return _check_win_mode(file_path, mode)
def check_owner(file_path):
# type: (str) -> bool
"""
Check if given file is owned by current user.
:param str file_path: File path to check
:rtype: bool
:return: True if given file is owned by current user, False otherwise.
"""
if POSIX_MODE:
return os.stat(file_path).st_uid == os.getuid()
# Get owner sid of the file
security = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION)
user = security.GetSecurityDescriptorOwner()
# Compare sids
return _get_current_user() == user
def check_permissions(file_path, mode):
# type: (str, int) -> bool
"""
Check if given file has the given mode and is owned by current user.
:param str file_path: File path to check
:param int mode: POSIX mode to check
:rtype: bool
:return: True if file has correct mode and owner, False otherwise.
"""
return check_owner(file_path) and check_mode(file_path, mode)
def open(file_path, flags, mode=0o777): # pylint: disable=redefined-builtin
# type: (str, int, int) -> int
"""
@ -107,6 +155,10 @@ def open(file_path, flags, mode=0o777): # pylint: disable=redefined-builtin
security = attributes.SECURITY_DESCRIPTOR
user = _get_current_user()
dacl = _generate_dacl(user, mode)
# We set second parameter to 0 (`False`) to say that this security descriptor is
# NOT constructed from a default mechanism, but is explicitly set by the user.
# See https://docs.microsoft.com/en-us/windows/desktop/api/securitybaseapi/nf-securitybaseapi-setsecuritydescriptorowner # pylint: disable=line-too-long
security.SetSecurityDescriptorOwner(user, 0)
# We set first parameter to 1 (`True`) to say that this security descriptor contains
# a DACL. Otherwise second and third parameters are ignored.
# We set third parameter to 0 (`False`) to say that this security descriptor is
@ -177,6 +229,7 @@ def mkdir(file_path, mode=0o777):
security = attributes.SECURITY_DESCRIPTOR
user = _get_current_user()
dacl = _generate_dacl(user, mode)
security.SetSecurityDescriptorOwner(user, False)
security.SetSecurityDescriptorDacl(1, dacl, 0)
try:
@ -353,6 +406,28 @@ def _generate_windows_flags(rights_desc):
return flag
def _check_win_mode(file_path, mode):
# Resolve symbolic links
file_path = realpath(file_path)
# Get current dacl file
security = win32security.GetFileSecurity(file_path, win32security.OWNER_SECURITY_INFORMATION
| win32security.DACL_SECURITY_INFORMATION)
dacl = security.GetSecurityDescriptorDacl()
# Get current file owner sid
user = security.GetSecurityDescriptorOwner()
if not dacl:
# No DACL means full control to everyone
# This is not a deterministic permissions set.
return False
# Calculate the target dacl
ref_dacl = _generate_dacl(user, mode)
return _compare_dacls(dacl, ref_dacl)
def _compare_dacls(dacl1, dacl2):
"""
This method compare the two given DACLs to check if they are identical.

View file

@ -42,22 +42,6 @@ def raise_for_non_administrative_windows_rights():
raise errors.Error('Error, certbot must be run on a shell with administrative rights.')
def os_geteuid():
"""
Get current user uid
:returns: The current user uid.
:rtype: int
"""
try:
# Linux specific
return os.geteuid()
except AttributeError:
# Windows specific
return 0
def readline_with_timeout(timeout, prompt):
# type: (float, str) -> str
"""
@ -88,16 +72,6 @@ def readline_with_timeout(timeout, prompt):
return sys.stdin.readline()
def compare_file_modes(mode1, mode2):
"""Return true if the two modes can be considered as equals for this platform"""
if os.name != 'nt':
# Linux specific: standard compare
return oct(stat.S_IMODE(mode1)) == oct(stat.S_IMODE(mode2))
# Windows specific: most of mode bits are ignored on Windows. Only check user R/W rights.
return (stat.S_IMODE(mode1) & stat.S_IREAD == stat.S_IMODE(mode2) & stat.S_IREAD
and stat.S_IMODE(mode1) & stat.S_IWRITE == stat.S_IMODE(mode2) & stat.S_IWRITE)
WINDOWS_DEFAULT_FOLDERS = {
'config': 'C:\\Certbot',
'work': 'C:\\Certbot\\lib',

View file

@ -28,7 +28,6 @@ from acme.magic_typing import IO # pylint: disable=unused-import, no-name-in-mo
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import misc
from certbot.compat import os
logger = logging.getLogger(__name__)
@ -61,8 +60,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"):
config = zope.component.getUtility(interfaces.IConfig)
# Save file
util.make_or_verify_dir(key_dir, 0o700, misc.os_geteuid(),
config.strict_permissions)
util.make_or_verify_dir(key_dir, 0o700, config.strict_permissions)
key_f, key_path = util.unique_file(
os.path.join(key_dir, keyname), 0o600, "wb")
with key_f:
@ -92,8 +90,7 @@ def init_save_csr(privkey, names, path):
privkey.pem, names, must_staple=config.must_staple)
# Save CSR
util.make_or_verify_dir(path, 0o755, misc.os_geteuid(),
config.strict_permissions)
util.make_or_verify_dir(path, 0o755, config.strict_permissions)
csr_f, csr_filename = util.unique_file(
os.path.join(path, "csr-certbot.pem"), 0o644, "wb")
with csr_f:

View file

@ -17,6 +17,7 @@ from __future__ import print_function
import functools
import logging
import logging.handlers
import shutil
import sys
import tempfile
import traceback
@ -26,7 +27,6 @@ from acme import messages
from certbot import constants
from certbot import errors
from certbot import util
from certbot.compat import misc
from certbot.compat import os
# Logging format
@ -134,8 +134,7 @@ def setup_log_file_handler(config, logfile, fmt):
"""
# TODO: logs might contain sensitive data such as contents of the
# private key! #525
util.set_up_core_dir(
config.logs_dir, 0o700, misc.os_geteuid(), config.strict_permissions)
util.set_up_core_dir(config.logs_dir, 0o700, config.strict_permissions)
log_file_path = os.path.join(config.logs_dir, logfile)
try:
handler = logging.handlers.RotatingFileHandler(
@ -240,9 +239,10 @@ class TempHandler(logging.StreamHandler):
"""
def __init__(self):
stream = tempfile.NamedTemporaryFile('w', delete=False)
self._workdir = tempfile.mkdtemp()
self.path = os.path.join(self._workdir, 'log')
stream = util.safe_open(self.path, mode='w', chmod=0o600)
super(TempHandler, self).__init__(stream)
self.path = stream.name
self._delete = True
def emit(self, record):
@ -266,7 +266,7 @@ class TempHandler(logging.StreamHandler):
# stream like stderr to be used
self.stream.close()
if self._delete:
os.remove(self.path)
shutil.rmtree(self._workdir)
self._delete = False
super(TempHandler, self).close()
finally:

View file

@ -1299,18 +1299,14 @@ def make_or_verify_needed_dirs(config):
:rtype: None
"""
util.set_up_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE,
misc.os_geteuid(), config.strict_permissions)
util.set_up_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE,
misc.os_geteuid(), config.strict_permissions)
util.set_up_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE, config.strict_permissions)
util.set_up_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE, config.strict_permissions)
hook_dirs = (config.renewal_pre_hooks_dir,
config.renewal_deploy_hooks_dir,
config.renewal_post_hooks_dir,)
for hook_dir in hook_dirs:
util.make_or_verify_dir(hook_dir,
uid=misc.os_geteuid(),
strict=config.strict_permissions)
util.make_or_verify_dir(hook_dir, strict=config.strict_permissions)
def set_displayer(config):

View file

@ -24,6 +24,7 @@ from certbot.display import ops
from certbot.display import util as display_util
from certbot.plugins import common
from certbot.plugins import util
from certbot.util import safe_open
logger = logging.getLogger(__name__)
@ -207,7 +208,7 @@ to serve all files under specified web root ({0})."""
old_umask = os.umask(0o022)
try:
with open(validation_path, "wb") as validation_file:
with safe_open(validation_path, mode="wb", chmod=0o644) as validation_file:
validation_file.write(validation.encode())
finally:
os.umask(old_umask)

View file

@ -17,7 +17,6 @@ from acme import challenges
from certbot import achallenges
from certbot import errors
from certbot.compat import misc
from certbot.compat import os
from certbot.compat import filesystem
from certbot.display import util as display_util
@ -168,14 +167,14 @@ class AuthenticatorTest(unittest.TestCase):
# Remove exec bit from permission check, so that it
# matches the file
self.auth.perform([self.achall])
self.assertTrue(misc.compare_file_modes(os.stat(self.validation_path).st_mode, 0o644))
self.assertTrue(filesystem.check_mode(self.validation_path, 0o644))
# Check permissions of the directories
for dirpath, dirnames, _ in os.walk(self.path):
for directory in dirnames:
full_path = os.path.join(dirpath, directory)
self.assertTrue(misc.compare_file_modes(os.stat(full_path).st_mode, 0o755))
self.assertTrue(filesystem.check_mode(full_path, 0o755))
parent_gid = os.stat(self.path).st_gid
parent_uid = os.stat(self.path).st_uid

View file

@ -15,7 +15,6 @@ from certbot import constants
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import misc
from certbot.compat import os
from certbot.compat import filesystem
@ -68,8 +67,7 @@ class Reverter(object):
self.config = config
util.make_or_verify_dir(
config.backup_dir, constants.CONFIG_DIRS_MODE, misc.os_geteuid(),
self.config.strict_permissions)
config.backup_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions)
def revert_temporary_config(self):
"""Reload users original configuration files after a temporary save.
@ -225,8 +223,7 @@ class Reverter(object):
"""
util.make_or_verify_dir(
cp_dir, constants.CONFIG_DIRS_MODE, misc.os_geteuid(),
self.config.strict_permissions)
cp_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions)
op_fd, existing_filepaths = self._read_and_append(
os.path.join(cp_dir, "FILEPATHS"))
@ -445,8 +442,7 @@ class Reverter(object):
cp_dir = self.config.in_progress_dir
util.make_or_verify_dir(
cp_dir, constants.CONFIG_DIRS_MODE, misc.os_geteuid(),
self.config.strict_permissions)
cp_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions)
return cp_dir

View file

@ -277,13 +277,12 @@ class SearchLineagesTest(BaseCertManagerTest):
@mock.patch('certbot.storage.renewal_conf_files')
@mock.patch('certbot.storage.RenewableCert')
def test_cert_storage_error(self, mock_renewable_cert, mock_renewal_conf_files,
mock_make_or_verify_dir):
mock_make_or_verify_dir):
mock_renewal_conf_files.return_value = ["badfile"]
mock_renewable_cert.side_effect = errors.CertStorageError
from certbot import cert_manager
# pylint: disable=protected-access
self.assertEqual(cert_manager._search_lineages(self.config, lambda x: x, "check"),
"check")
self.assertEqual(cert_manager._search_lineages(self.config, lambda x: x, "check"), "check")
self.assertTrue(mock_make_or_verify_dir.called)
@ -294,33 +293,28 @@ class LineageForCertnameTest(BaseCertManagerTest):
@mock.patch('certbot.storage.renewal_file_for_certname')
@mock.patch('certbot.storage.RenewableCert')
def test_found_match(self, mock_renewable_cert, mock_renewal_conf_file,
mock_make_or_verify_dir):
mock_make_or_verify_dir):
mock_renewal_conf_file.return_value = "somefile.conf"
mock_match = mock.Mock(lineagename="example.com")
mock_renewable_cert.return_value = mock_match
from certbot import cert_manager
self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"),
mock_match)
self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), mock_match)
self.assertTrue(mock_make_or_verify_dir.called)
@mock.patch('certbot.util.make_or_verify_dir')
@mock.patch('certbot.storage.renewal_file_for_certname')
def test_no_match(self, mock_renewal_conf_file,
mock_make_or_verify_dir):
def test_no_match(self, mock_renewal_conf_file, mock_make_or_verify_dir):
mock_renewal_conf_file.return_value = "other.com.conf"
from certbot import cert_manager
self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"),
None)
self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), None)
self.assertTrue(mock_make_or_verify_dir.called)
@mock.patch('certbot.util.make_or_verify_dir')
@mock.patch('certbot.storage.renewal_file_for_certname')
def test_no_renewal_file(self, mock_renewal_conf_file,
mock_make_or_verify_dir):
def test_no_renewal_file(self, mock_renewal_conf_file, mock_make_or_verify_dir):
mock_renewal_conf_file.side_effect = errors.CertStorageError()
from certbot import cert_manager
self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"),
None)
self.assertEqual(cert_manager.lineage_for_certname(self.config, "example.com"), None)
self.assertTrue(mock_make_or_verify_dir.called)
@ -331,7 +325,7 @@ class DomainsForCertnameTest(BaseCertManagerTest):
@mock.patch('certbot.storage.renewal_file_for_certname')
@mock.patch('certbot.storage.RenewableCert')
def test_found_match(self, mock_renewable_cert, mock_renewal_conf_file,
mock_make_or_verify_dir):
mock_make_or_verify_dir):
mock_renewal_conf_file.return_value = "somefile.conf"
mock_match = mock.Mock(lineagename="example.com")
domains = ["example.com", "example.org"]
@ -344,12 +338,10 @@ class DomainsForCertnameTest(BaseCertManagerTest):
@mock.patch('certbot.util.make_or_verify_dir')
@mock.patch('certbot.storage.renewal_file_for_certname')
def test_no_match(self, mock_renewal_conf_file,
mock_make_or_verify_dir):
def test_no_match(self, mock_renewal_conf_file, mock_make_or_verify_dir):
mock_renewal_conf_file.return_value = "somefile.conf"
from certbot import cert_manager
self.assertEqual(cert_manager.domains_for_certname(self.config, "other.com"),
None)
self.assertEqual(cert_manager.domains_for_certname(self.config, "other.com"), None)
self.assertTrue(mock_make_or_verify_dir.called)

View file

@ -16,6 +16,7 @@ except ImportError:
import certbot.tests.util as test_util
from certbot import lock
from certbot import util
from certbot.compat import os
from certbot.compat import filesystem
from certbot.tests.util import TempDirTestCase
@ -306,6 +307,52 @@ class CopyOwnershipTest(test_util.TempDirTestCase):
mock_chmod.assert_called_once_with(self.probe_path, 0o700)
class CheckPermissionsTest(test_util.TempDirTestCase):
def setUp(self):
super(CheckPermissionsTest, self).setUp()
self.probe_path = _create_probe(self.tempdir)
def test_check_mode(self):
self.assertTrue(filesystem.check_mode(self.probe_path, 0o744))
filesystem.chmod(self.probe_path, 0o700)
self.assertFalse(filesystem.check_mode(self.probe_path, 0o744))
@unittest.skipIf(POSIX_MODE, reason='Test specific to Windows security')
def test_check_owner_windows(self):
self.assertTrue(filesystem.check_owner(self.probe_path))
system = win32security.ConvertStringSidToSid(SYSTEM_SID)
security = win32security.SECURITY_ATTRIBUTES().SECURITY_DESCRIPTOR
security.SetSecurityDescriptorOwner(system, False)
with mock.patch('win32security.GetFileSecurity') as mock_get:
mock_get.return_value = security
self.assertFalse(filesystem.check_owner(self.probe_path))
@unittest.skipUnless(POSIX_MODE, reason='Test specific to Linux security')
def test_check_owner_linux(self):
self.assertTrue(filesystem.check_owner(self.probe_path))
import os as std_os # pylint: disable=os-module-forbidden
uid = std_os.getuid()
with mock.patch('os.getuid') as mock_uid:
mock_uid.return_value = uid + 1
self.assertFalse(filesystem.check_owner(self.probe_path))
def test_check_permissions(self):
self.assertTrue(filesystem.check_permissions(self.probe_path, 0o744))
with mock.patch('certbot.compat.filesystem.check_mode') as mock_mode:
mock_mode.return_value = False
self.assertFalse(filesystem.check_permissions(self.probe_path, 0o744))
with mock.patch('certbot.compat.filesystem.check_owner') as mock_owner:
mock_owner.return_value = False
self.assertFalse(filesystem.check_permissions(self.probe_path, 0o744))
class OsReplaceTest(test_util.TempDirTestCase):
"""Test to ensure consistent behavior of rename method"""
def test_os_replace_to_existing_file(self):
@ -381,7 +428,7 @@ def _set_owner(target, security_owner, user):
def _create_probe(tempdir):
filesystem.chmod(tempdir, 0o744)
probe_path = os.path.join(tempdir, 'probe')
open(probe_path, 'w').close()
util.safe_open(probe_path, 'w', chmod=0o744).close()
return probe_path

View file

@ -11,6 +11,7 @@ from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import os
from certbot.compat import filesystem
RSA256_KEY = test_util.load_vector('rsa256_key.pem')
RSA256_KEY_PATH = test_util.vector_path('rsa256_key.pem')
@ -29,6 +30,9 @@ class InitSaveKeyTest(test_util.TempDirTestCase):
def setUp(self):
super(InitSaveKeyTest, self).setUp()
self.workdir = os.path.join(self.tempdir, 'workdir')
filesystem.mkdir(self.workdir, mode=0o700)
logging.disable(logging.CRITICAL)
zope.component.provideUtility(
mock.Mock(strict_permissions=True), interfaces.IConfig)
@ -46,15 +50,15 @@ class InitSaveKeyTest(test_util.TempDirTestCase):
@mock.patch('certbot.crypto_util.make_key')
def test_success(self, mock_make):
mock_make.return_value = b'key_pem'
key = self._call(1024, self.tempdir)
key = self._call(1024, self.workdir)
self.assertEqual(key.pem, b'key_pem')
self.assertTrue('key-certbot.pem' in key.file)
self.assertTrue(os.path.exists(os.path.join(self.tempdir, key.file)))
self.assertTrue(os.path.exists(os.path.join(self.workdir, key.file)))
@mock.patch('certbot.crypto_util.make_key')
def test_key_failure(self, mock_make):
mock_make.side_effect = ValueError
self.assertRaises(ValueError, self._call, 431, self.tempdir)
self.assertRaises(ValueError, self._call, 431, self.workdir)
class InitSaveCSRTest(test_util.TempDirTestCase):

View file

@ -14,7 +14,7 @@ from acme.magic_typing import Optional # pylint: disable=unused-import, no-name
from certbot import constants
from certbot import errors
from certbot import util
from certbot.compat import misc
from certbot.compat import filesystem
from certbot.compat import os
from certbot.tests import util as test_util
@ -260,8 +260,7 @@ class TempHandlerTest(unittest.TestCase):
self.handler.close()
def test_permissions(self):
self.assertTrue(
util.check_permissions(self.handler.path, 0o600, misc.os_geteuid()))
self.assertTrue(filesystem.check_permissions(self.handler.path, 0o600))
def test_delete(self):
self.handler.close()

View file

@ -31,7 +31,6 @@ from certbot import interfaces # pylint: disable=unused-import
from certbot import main
from certbot import updater
from certbot import util
from certbot.compat import misc
from certbot.compat import os
from certbot.compat import filesystem
from certbot.plugins import disco
@ -809,9 +808,9 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
def throw_error(directory, mode, uid, strict):
def throw_error(directory, mode, strict):
"""Raises error.Error."""
_, _, _, _ = directory, mode, uid, strict
_, _, _ = directory, mode, strict
raise errors.Error()
stdout = six.StringIO()
@ -1593,7 +1592,7 @@ class MakeOrVerifyNeededDirs(test_util.ConfigTestCase):
for core_dir in (self.config.config_dir, self.config.work_dir,):
mock_util.set_up_core_dir.assert_any_call(
core_dir, constants.CONFIG_DIRS_MODE,
misc.os_geteuid(), self.config.strict_permissions
self.config.strict_permissions
)
hook_dirs = (self.config.renewal_pre_hooks_dir,
@ -1602,8 +1601,7 @@ class MakeOrVerifyNeededDirs(test_util.ConfigTestCase):
for hook_dir in hook_dirs:
# default mode of 755 is used
mock_util.make_or_verify_dir.assert_any_call(
hook_dir, uid=misc.os_geteuid(),
strict=self.config.strict_permissions)
hook_dir, strict=self.config.strict_permissions)
class EnhanceTest(test_util.ConfigTestCase):

View file

@ -13,7 +13,6 @@ import six
import certbot
import certbot.tests.util as test_util
from certbot import errors
from certbot.compat import misc
from certbot.compat import os
from certbot.compat import filesystem
from certbot.storage import ALL_FOUR
@ -21,7 +20,6 @@ from certbot.storage import ALL_FOUR
CERT = test_util.load_cert('cert_512.pem')
def unlink_all(rc_object):
"""Unlink all four items associated with this RenewableCert."""
for kind in ALL_FOUR:
@ -498,7 +496,6 @@ class RenewableCertTests(BaseRenewableCertTest):
self.assertTrue(self.test_rc.should_autorenew())
mock_ocsp.return_value = False
@test_util.broken_on_windows
@mock.patch("certbot.storage.relevant_values")
def test_save_successor(self, mock_rv):
# Mock relevant_values() to claim that all values are relevant here
@ -562,7 +559,7 @@ class RenewableCertTests(BaseRenewableCertTest):
self.assertFalse(os.path.islink(self.test_rc.version("privkey", 10)))
self.assertFalse(os.path.exists(temp_config_file))
@test_util.broken_on_windows
@test_util.skip_on_windows('Group/everybody permissions are not maintained on Windows.')
@mock.patch("certbot.storage.relevant_values")
def test_save_successor_maintains_group_mode(self, mock_rv):
# Mock relevant_values() to claim that all values are relevant here
@ -571,22 +568,18 @@ class RenewableCertTests(BaseRenewableCertTest):
for kind in ALL_FOUR:
self._write_out_kind(kind, 1)
self.test_rc.update_all_links_to(1)
self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 1)).st_mode, 0o600))
self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 1), 0o600))
filesystem.chmod(self.test_rc.version("privkey", 1), 0o444)
# If no new key, permissions should be the same (we didn't write any keys)
self.test_rc.save_successor(1, b"newcert", None, b"new chain", self.config)
self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 2)).st_mode, 0o444))
self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 2), 0o444))
# If new key, permissions should be kept as 644
self.test_rc.save_successor(2, b"newcert", b"new_privkey", b"new chain", self.config)
self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 3)).st_mode, 0o644))
self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 3), 0o644))
# If permissions reverted, next renewal will also revert permissions of new key
filesystem.chmod(self.test_rc.version("privkey", 3), 0o400)
self.test_rc.save_successor(3, b"newcert", b"new_privkey", b"new chain", self.config)
self.assertTrue(misc.compare_file_modes(
os.stat(self.test_rc.version("privkey", 4)).st_mode, 0o600))
self.assertTrue(filesystem.check_mode(self.test_rc.version("privkey", 4), 0o600))
@mock.patch("certbot.storage.relevant_values")
@mock.patch("certbot.storage.filesystem.copy_ownership_and_apply_mode")
@ -622,7 +615,7 @@ class RenewableCertTests(BaseRenewableCertTest):
self.config.live_dir, "README")))
self.assertTrue(os.path.exists(os.path.join(
self.config.live_dir, "the-lineage.com", "README")))
self.assertTrue(misc.compare_file_modes(os.stat(result.key_path).st_mode, 0o600))
self.assertTrue(filesystem.check_mode(result.key_path, 0o600))
with open(result.fullchain, "rb") as f:
self.assertEqual(f.read(), b"cert" + b"chain")
# Let's do it again and make sure it makes a different lineage

View file

@ -9,7 +9,6 @@ from six.moves import reload_module # pylint: disable=import-error
import certbot.tests.util as test_util
from certbot import errors
from certbot.compat import misc
from certbot.compat import os
from certbot.compat import filesystem
@ -120,15 +119,14 @@ class SetUpCoreDirTest(test_util.TempDirTestCase):
@mock.patch('certbot.util.lock_dir_until_exit')
def test_success(self, mock_lock):
new_dir = os.path.join(self.tempdir, 'new')
self._call(new_dir, 0o700, misc.os_geteuid(), False)
self._call(new_dir, 0o700, False)
self.assertTrue(os.path.exists(new_dir))
self.assertEqual(mock_lock.call_count, 1)
@mock.patch('certbot.util.make_or_verify_dir')
def test_failure(self, mock_make_or_verify):
mock_make_or_verify.side_effect = OSError
self.assertRaises(errors.Error, self._call,
self.tempdir, 0o700, misc.os_geteuid(), False)
self.assertRaises(errors.Error, self._call, self.tempdir, 0o700, False)
class MakeOrVerifyDirTest(test_util.TempDirTestCase):
@ -145,23 +143,20 @@ class MakeOrVerifyDirTest(test_util.TempDirTestCase):
self.path = os.path.join(self.tempdir, "foo")
filesystem.mkdir(self.path, 0o600)
self.uid = misc.os_geteuid()
def _call(self, directory, mode):
from certbot.util import make_or_verify_dir
return make_or_verify_dir(directory, mode, self.uid, strict=True)
return make_or_verify_dir(directory, mode, strict=True)
def test_creates_dir_when_missing(self):
path = os.path.join(self.tempdir, "bar")
self._call(path, 0o650)
self.assertTrue(os.path.isdir(path))
self.assertTrue(misc.compare_file_modes(os.stat(path).st_mode, 0o650))
self.assertTrue(filesystem.check_mode(path, 0o650))
def test_existing_correct_mode_does_not_fail(self):
self._call(self.path, 0o600)
self.assertTrue(misc.compare_file_modes(os.stat(self.path).st_mode, 0o600))
self.assertTrue(filesystem.check_mode(self.path, 0o600))
@test_util.skip_on_windows('Umask modes are mostly ignored on Windows.')
def test_existing_wrong_mode_fails(self):
self.assertRaises(errors.Error, self._call, self.path, 0o400)
@ -171,39 +166,6 @@ class MakeOrVerifyDirTest(test_util.TempDirTestCase):
self.assertRaises(OSError, self._call, "bar", 12312312)
class CheckPermissionsTest(test_util.TempDirTestCase):
"""Tests for certbot.util.check_permissions.
Note that it is not possible to test for a wrong file owner,
as this testing script would have to be run as root.
"""
def setUp(self):
super(CheckPermissionsTest, self).setUp()
self.uid = misc.os_geteuid()
def _call(self, mode):
from certbot.util import check_permissions
return check_permissions(self.tempdir, mode, self.uid)
def test_ok_mode(self):
filesystem.chmod(self.tempdir, 0o600)
self.assertTrue(self._call(0o600))
# TODO: reactivate the test when all logic from windows file permissions is merged.
@test_util.broken_on_windows
def test_wrong_mode(self):
filesystem.chmod(self.tempdir, 0o400)
try:
self.assertFalse(self._call(0o600))
finally:
# Without proper write permissions, Windows is unable to delete a folder,
# even with admin permissions. Write access must be explicitly set first.
filesystem.chmod(self.tempdir, 0o700)
class UniqueFileTest(test_util.TempDirTestCase):
"""Tests for certbot.util.unique_file."""
@ -226,8 +188,8 @@ class UniqueFileTest(test_util.TempDirTestCase):
def test_right_mode(self):
fd1, name1 = self._call(0o700)
fd2, name2 = self._call(0o600)
self.assertTrue(misc.compare_file_modes(0o700, os.stat(name1).st_mode))
self.assertTrue(misc.compare_file_modes(0o600, os.stat(name2).st_mode))
self.assertTrue(filesystem.check_mode(name1, 0o700))
self.assertTrue(filesystem.check_mode(name2, 0o600))
fd1.close()
fd2.close()

View file

@ -21,7 +21,6 @@ from acme.magic_typing import Tuple, Union # pylint: disable=unused-import, no-
from certbot import constants
from certbot import errors
from certbot import lock
from certbot.compat import misc
from certbot.compat import os
from certbot.compat import filesystem
@ -144,12 +143,11 @@ def _release_locks():
_LOCKS.clear()
def set_up_core_dir(directory, mode, uid, strict):
def set_up_core_dir(directory, mode, strict):
"""Ensure directory exists with proper permissions and is locked.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:param bool strict: require directory to be owned by current user
:raises .errors.LockError: if the directory cannot be locked
@ -157,19 +155,18 @@ def set_up_core_dir(directory, mode, uid, strict):
"""
try:
make_or_verify_dir(directory, mode, uid, strict)
make_or_verify_dir(directory, mode, strict)
lock_dir_until_exit(directory)
except OSError as error:
logger.debug("Exception was:", exc_info=True)
raise errors.Error(PERM_ERR_FMT.format(error))
def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False):
def make_or_verify_dir(directory, mode=0o755, strict=False):
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:param bool strict: require directory to be owned by current user
:raises .errors.Error: if a directory already exists,
@ -184,29 +181,14 @@ def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False):
filesystem.makedirs(directory, mode)
except OSError as exception:
if exception.errno == errno.EEXIST:
if strict and not check_permissions(directory, mode, uid):
if strict and not filesystem.check_permissions(directory, mode):
raise errors.Error(
"%s exists, but it should be owned by user %d with"
"permissions %s" % (directory, uid, oct(mode)))
"%s exists, but it should be owned by current user with"
" permissions %s" % (directory, oct(mode)))
else:
raise
def check_permissions(filepath, mode, uid=0):
"""Check file or directory permissions.
:param str filepath: Path to the tested file (or directory).
:param int mode: Expected file mode.
:param int uid: Expected file owner.
:returns: True if `mode` and `uid` match, False otherwise.
:rtype: bool
"""
file_stat = os.stat(filepath)
return misc.compare_file_modes(file_stat.st_mode, mode) and file_stat.st_uid == uid
def safe_open(path, mode="w", chmod=None):
"""Safely open a file.