diff --git a/letsencrypt/le_util.py b/letsencrypt/le_util.py index 583754ab5..e525a333c 100644 --- a/letsencrypt/le_util.py +++ b/letsencrypt/le_util.py @@ -58,16 +58,30 @@ def check_permissions(filepath, mode, uid=0): return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid -def _safely_attempt_open(fname, mode): - file_d = os.open(fname, os.O_CREAT | os.O_EXCL | os.O_RDWR, mode) - return os.fdopen(file_d, "w"), fname +def safe_open(path, mode="w", chmod=None, buffering=None): + """Safely open a file. + + :param str path: Path to a file. + :param str mode: Same os `mode` for `open`. + :param int chmod: Same as `mode` for `os.open`, uses Python defaults + if ``None``. + :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python + defaults if ``None``. + + """ + # pylint: disable=star-args + open_args = () if chmod is None else (chmod,) + fdopen_args = () if buffering is None else (buffering,) + return os.fdopen( + os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args), + mode, *fdopen_args) def _unique_file(path, filename_pat, count, mode): while True: + current_path = os.path.join(path, filename_pat(count)) try: - return _safely_attempt_open( - os.path.join(path, filename_pat(count)), mode) + return safe_open(current_path, chmod=mode), current_path except OSError as err: # "File exists," is okay, try a different name. if err.errno != errno.EEXIST: @@ -105,9 +119,9 @@ def unique_lineage_name(path, filename, mode=0o777): specified location. """ + preferred_path = os.path.join(path, "%s.conf" % (filename)) try: - return _safely_attempt_open( - os.path.join(path, "%s.conf" % (filename)), mode=mode) + return safe_open(preferred_path, chmod=mode), preferred_path except OSError as err: if err.errno != errno.EEXIST: raise diff --git a/letsencrypt/tests/le_util_test.py b/letsencrypt/tests/le_util_test.py index 5a6364e81..1ecc1ea16 100644 --- a/letsencrypt/tests/le_util_test.py +++ b/letsencrypt/tests/le_util_test.py @@ -21,7 +21,7 @@ class MakeOrVerifyDirTest(unittest.TestCase): def setUp(self): self.root_path = tempfile.mkdtemp() - self.path = os.path.join(self.root_path, 'foo') + self.path = os.path.join(self.root_path, "foo") os.mkdir(self.path, 0o400) self.uid = os.getuid() @@ -34,7 +34,7 @@ class MakeOrVerifyDirTest(unittest.TestCase): return make_or_verify_dir(directory, mode, self.uid) def test_creates_dir_when_missing(self): - path = os.path.join(self.root_path, 'bar') + path = os.path.join(self.root_path, "bar") self._call(path, 0o650) self.assertTrue(os.path.isdir(path)) self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o650) @@ -47,9 +47,9 @@ class MakeOrVerifyDirTest(unittest.TestCase): self.assertRaises(errors.Error, self._call, self.path, 0o600) def test_reraises_os_error(self): - with mock.patch.object(os, 'makedirs') as makedirs: + with mock.patch.object(os, "makedirs") as makedirs: makedirs.side_effect = OSError() - self.assertRaises(OSError, self._call, 'bar', 12312312) + self.assertRaises(OSError, self._call, "bar", 12312312) class CheckPermissionsTest(unittest.TestCase): @@ -85,7 +85,7 @@ class UniqueFileTest(unittest.TestCase): def setUp(self): self.root_path = tempfile.mkdtemp() - self.default_name = os.path.join(self.root_path, 'foo.txt') + self.default_name = os.path.join(self.root_path, "foo.txt") def tearDown(self): shutil.rmtree(self.root_path, ignore_errors=True) @@ -96,9 +96,9 @@ class UniqueFileTest(unittest.TestCase): def test_returns_fd_for_writing(self): fd, name = self._call() - fd.write('bar') + fd.write("bar") fd.close() - self.assertEqual(open(name).read(), 'bar') + self.assertEqual(open(name).read(), "bar") def test_right_mode(self): self.assertEqual(0o700, os.stat(self._call(0o700)[1]).st_mode & 0o777) @@ -118,11 +118,11 @@ class UniqueFileTest(unittest.TestCase): self.assertEqual(os.path.dirname(name3), self.root_path) basename1 = os.path.basename(name2) - self.assertTrue(basename1.endswith('foo.txt')) + self.assertTrue(basename1.endswith("foo.txt")) basename2 = os.path.basename(name2) - self.assertTrue(basename2.endswith('foo.txt')) + self.assertTrue(basename2.endswith("foo.txt")) basename3 = os.path.basename(name3) - self.assertTrue(basename3.endswith('foo.txt')) + self.assertTrue(basename3.endswith("foo.txt")) class UniqueLineageNameTest(unittest.TestCase): @@ -139,9 +139,9 @@ class UniqueLineageNameTest(unittest.TestCase): return unique_lineage_name(self.root_path, filename, mode) def test_basic(self): - f, name = self._call("wow") + f, path = self._call("wow") self.assertTrue(isinstance(f, file)) - self.assertTrue(isinstance(name, str)) + self.assertEqual(os.path.join(self.root_path, "wow.conf"), path) def test_multiple(self): for _ in xrange(10): @@ -192,5 +192,5 @@ class SafeEmailTest(unittest.TestCase): self.assertFalse(self._call(addr), "%s failed." % addr) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() # pragma: no cover