Expose le_util.safe_open.

This commit is contained in:
Jakub Warmuz 2015-07-09 10:55:51 +00:00
parent 3e2d1c8abc
commit 0d24f52f6e
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
2 changed files with 34 additions and 20 deletions

View file

@ -58,16 +58,30 @@ def check_permissions(filepath, mode, uid=0):
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
def _safely_attempt_open(fname, mode):
file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode)
return os.fdopen(file_d, "w"), fname
def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def _unique_file(path, filename_pat, count, mode):
while True:
current_path = os.path.join(path, filename_pat(count))
try:
return _safely_attempt_open(
os.path.join(path, filename_pat(count)), mode)
return safe_open(current_path, chmod=mode), current_path
except OSError as err:
# "File exists," is okay, try a different name.
if err.errno != errno.EEXIST:
@ -105,9 +119,9 @@ def unique_lineage_name(path, filename, mode=0o777):
specified location.
"""
preferred_path = os.path.join(path, "%s.conf" % (filename))
try:
return _safely_attempt_open(
os.path.join(path, "%s.conf" % (filename)), mode=mode)
return safe_open(preferred_path, chmod=mode), preferred_path
except OSError as err:
if err.errno != errno.EEXIST:
raise

View file

@ -21,7 +21,7 @@ class MakeOrVerifyDirTest(unittest.TestCase):
def setUp(self):
self.root_path = tempfile.mkdtemp()
self.path = os.path.join(self.root_path, 'foo')
self.path = os.path.join(self.root_path, "foo")
os.mkdir(self.path, 0o400)
self.uid = os.getuid()
@ -34,7 +34,7 @@ class MakeOrVerifyDirTest(unittest.TestCase):
return make_or_verify_dir(directory, mode, self.uid)
def test_creates_dir_when_missing(self):
path = os.path.join(self.root_path, 'bar')
path = os.path.join(self.root_path, "bar")
self._call(path, 0o650)
self.assertTrue(os.path.isdir(path))
self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o650)
@ -47,9 +47,9 @@ class MakeOrVerifyDirTest(unittest.TestCase):
self.assertRaises(errors.Error, self._call, self.path, 0o600)
def test_reraises_os_error(self):
with mock.patch.object(os, 'makedirs') as makedirs:
with mock.patch.object(os, "makedirs") as makedirs:
makedirs.side_effect = OSError()
self.assertRaises(OSError, self._call, 'bar', 12312312)
self.assertRaises(OSError, self._call, "bar", 12312312)
class CheckPermissionsTest(unittest.TestCase):
@ -85,7 +85,7 @@ class UniqueFileTest(unittest.TestCase):
def setUp(self):
self.root_path = tempfile.mkdtemp()
self.default_name = os.path.join(self.root_path, 'foo.txt')
self.default_name = os.path.join(self.root_path, "foo.txt")
def tearDown(self):
shutil.rmtree(self.root_path, ignore_errors=True)
@ -96,9 +96,9 @@ class UniqueFileTest(unittest.TestCase):
def test_returns_fd_for_writing(self):
fd, name = self._call()
fd.write('bar')
fd.write("bar")
fd.close()
self.assertEqual(open(name).read(), 'bar')
self.assertEqual(open(name).read(), "bar")
def test_right_mode(self):
self.assertEqual(0o700, os.stat(self._call(0o700)[1]).st_mode & 0o777)
@ -118,11 +118,11 @@ class UniqueFileTest(unittest.TestCase):
self.assertEqual(os.path.dirname(name3), self.root_path)
basename1 = os.path.basename(name2)
self.assertTrue(basename1.endswith('foo.txt'))
self.assertTrue(basename1.endswith("foo.txt"))
basename2 = os.path.basename(name2)
self.assertTrue(basename2.endswith('foo.txt'))
self.assertTrue(basename2.endswith("foo.txt"))
basename3 = os.path.basename(name3)
self.assertTrue(basename3.endswith('foo.txt'))
self.assertTrue(basename3.endswith("foo.txt"))
class UniqueLineageNameTest(unittest.TestCase):
@ -139,9 +139,9 @@ class UniqueLineageNameTest(unittest.TestCase):
return unique_lineage_name(self.root_path, filename, mode)
def test_basic(self):
f, name = self._call("wow")
f, path = self._call("wow")
self.assertTrue(isinstance(f, file))
self.assertTrue(isinstance(name, str))
self.assertEqual(os.path.join(self.root_path, "wow.conf"), path)
def test_multiple(self):
for _ in xrange(10):
@ -192,5 +192,5 @@ class SafeEmailTest(unittest.TestCase):
self.assertFalse(self._call(addr), "%s failed." % addr)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main() # pragma: no cover