mirror of
https://github.com/certbot/certbot.git
synced 2026-06-07 15:52:08 -04:00
commit
06a989e5a3
2 changed files with 36 additions and 33 deletions
|
|
@ -54,8 +54,25 @@ def check_permissions(filepath, mode, uid=0):
|
|||
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
|
||||
|
||||
|
||||
def _safely_attempt_open(fname, mode):
|
||||
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
|
||||
return os.fdopen(file_d, "w"), fname
|
||||
|
||||
|
||||
def _unique_file(path, filename_pat, count, mode):
|
||||
while True:
|
||||
try:
|
||||
return _safely_attempt_open(
|
||||
os.path.join(path, filename_pat(count)), mode)
|
||||
except OSError as err:
|
||||
# "File exists," is okay, try a different name.
|
||||
if err.errno != errno.EEXIST:
|
||||
raise
|
||||
count += 1
|
||||
|
||||
|
||||
def unique_file(path, mode=0o777):
|
||||
"""Safely finds a unique file for writing only (by default).
|
||||
"""Safely finds a unique file.
|
||||
|
||||
:param str path: path/filename.ext
|
||||
:param int mode: File mode
|
||||
|
|
@ -64,52 +81,35 @@ def unique_file(path, mode=0o777):
|
|||
|
||||
"""
|
||||
path, tail = os.path.split(path)
|
||||
count = 0
|
||||
while True:
|
||||
fname = os.path.join(path, "%04d_%s" % (count, tail))
|
||||
try:
|
||||
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
|
||||
return os.fdopen(file_d, "w"), fname
|
||||
except OSError as exception:
|
||||
# "File exists," is okay, try a different name.
|
||||
if exception.errno != errno.EEXIST:
|
||||
raise
|
||||
count += 1
|
||||
return _unique_file(
|
||||
path, filename_pat=(lambda count: "%04d_%s" % (count, tail)),
|
||||
count=0, mode=mode)
|
||||
|
||||
|
||||
def unique_lineage_name(path, filename, mode=0o777):
|
||||
"""Safely finds a unique file for writing only (by default). Uses a
|
||||
file lineage convention.
|
||||
"""Safely finds a unique file using lineage convention.
|
||||
|
||||
:param str path: directory path
|
||||
:param str filename: proposed filename
|
||||
:param int mode: file mode
|
||||
|
||||
:returns: tuple of file object and file name (which may be modified from
|
||||
the requested one by appending digits to ensure uniqueness)
|
||||
:returns: tuple of file object and file name (which may be modified
|
||||
from the requested one by appending digits to ensure uniqueness)
|
||||
|
||||
:raises OSError: if writing files fails for an unanticipated reason,
|
||||
such as a full disk or a lack of permission to write to specified
|
||||
location.
|
||||
such as a full disk or a lack of permission to write to
|
||||
specified location.
|
||||
|
||||
"""
|
||||
fname = os.path.join(path, "%s.conf" % (filename))
|
||||
try:
|
||||
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
|
||||
return os.fdopen(file_d, "w"), fname
|
||||
return _safely_attempt_open(
|
||||
os.path.join(path, "%s.conf" % (filename)), mode=mode)
|
||||
except OSError as err:
|
||||
if err.errno != errno.EEXIST:
|
||||
raise err
|
||||
count = 1
|
||||
while True:
|
||||
fname = os.path.join(path, "%s-%04d.conf" % (filename, count))
|
||||
try:
|
||||
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
|
||||
return os.fdopen(file_d, "w"), fname
|
||||
except OSError as err:
|
||||
if err.errno != errno.EEXIST:
|
||||
raise err
|
||||
count += 1
|
||||
raise
|
||||
return _unique_file(
|
||||
path, filename_pat=(lambda count: "%s-%04d.conf" % (filename, count)),
|
||||
count=1, mode=mode)
|
||||
|
||||
|
||||
def safely_remove(path):
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import unittest
|
|||
|
||||
import mock
|
||||
|
||||
from letsencrypt import errors
|
||||
|
||||
|
||||
class MakeOrVerifyDirTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.le_util.make_or_verify_dir.
|
||||
|
|
@ -42,7 +44,8 @@ class MakeOrVerifyDirTest(unittest.TestCase):
|
|||
self.assertEqual(stat.S_IMODE(os.stat(self.path).st_mode), 0o400)
|
||||
|
||||
def test_existing_wrong_mode_fails(self):
|
||||
self.assertRaises(Exception, self._call, self.path, 0o600)
|
||||
self.assertRaises(
|
||||
errors.LetsEncryptClientError, self._call, self.path, 0o600)
|
||||
|
||||
def test_reraises_os_error(self):
|
||||
with mock.patch.object(os, 'makedirs') as makedirs:
|
||||
|
|
|
|||
Loading…
Reference in a new issue