pathlib refactor fslocking

This commit is contained in:
Thomas Waldmann 2025-06-10 13:40:43 +02:00
parent 4ff49a6e91
commit 3c8cfd0e29
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 35 additions and 32 deletions

View file

@ -3,6 +3,7 @@ import json
import os
import tempfile
import time
from pathlib import Path
from . import platform
from .helpers import Error, ErrorWithTraceback
@ -119,9 +120,9 @@ class ExclusiveLock:
def __init__(self, path, timeout=None, sleep=None, id=None):
self.timeout = timeout
self.sleep = sleep
self.path = os.path.abspath(path)
self.path = Path(path).absolute()
self.id = id or platform.get_process_id()
self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
self.unique_name = self.path / ("%s.%d-%x" % self.id)
self.kill_stale_locks = True
self.stale_warning_printed = False
@ -132,34 +133,34 @@ class ExclusiveLock:
self.release()
def __repr__(self):
return f"<{self.__class__.__name__}: {self.unique_name!r}>"
return f"<{self.__class__.__name__}: {str(self.unique_name)!r}>"
def acquire(self, timeout=None, sleep=None):
if timeout is None:
timeout = self.timeout
if sleep is None:
sleep = self.sleep
parent_path, base_name = os.path.split(self.path)
unique_base_name = os.path.basename(self.unique_name)
parent_path, base_name = str(self.path.parent), self.path.name
unique_base_name = self.unique_name.name
temp_path = None
try:
temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path)
temp_unique_name = os.path.join(temp_path, unique_base_name)
with open(temp_unique_name, "wb"):
temp_unique_name = Path(temp_path) / unique_base_name
with temp_unique_name.open("wb"):
pass
except OSError as err:
raise LockFailed(self.path, str(err)) from None
raise LockFailed(str(self.path), str(err)) from None
else:
timer = TimeoutTimer(timeout, sleep).start()
while True:
try:
os.replace(temp_path, self.path)
Path(temp_path).replace(str(self.path))
except OSError: # already locked
if self.by_me():
return self
self.kill_stale_lock()
if timer.timed_out_or_sleep():
raise LockTimeout(self.path) from None
raise LockTimeout(str(self.path)) from None
else:
temp_path = None # see finally:-block below
return self
@ -178,13 +179,13 @@ class ExclusiveLock:
def release(self):
if not self.is_locked():
raise NotLocked(self.path)
raise NotLocked(str(self.path))
if not self.by_me():
raise NotMyLock(self.path)
os.unlink(self.unique_name)
raise NotMyLock(str(self.path))
self.unique_name.unlink()
for retry in range(42):
try:
os.rmdir(self.path)
self.path.rmdir()
except OSError as err:
if err.errno in (errno.EACCES,):
# windows behaving strangely? -> just try again.
@ -198,14 +199,14 @@ class ExclusiveLock:
return
def is_locked(self):
return os.path.exists(self.path)
return self.path.exists()
def by_me(self):
return os.path.exists(self.unique_name)
return self.unique_name.exists()
def kill_stale_lock(self):
try:
names = os.listdir(self.path)
names = [p.name for p in self.path.iterdir()]
except FileNotFoundError: # another process did our job in the meantime.
return False
except PermissionError: # win32 might throw this.
@ -219,7 +220,7 @@ class ExclusiveLock:
thread = int(thread_str, 16)
except ValueError:
# Malformed lock name? Or just some new format we don't understand?
logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, str(self.path))
return False
if platform.process_alive(host, pid, thread):
@ -235,7 +236,7 @@ class ExclusiveLock:
return False
try:
os.unlink(os.path.join(self.path, name))
(self.path / name).unlink()
logger.warning("Killed stale lock %s.", name)
except OSError as err:
if not self.stale_warning_printed:
@ -245,7 +246,7 @@ class ExclusiveLock:
return False
try:
os.rmdir(self.path)
self.path.rmdir()
except OSError as err:
if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
# Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
@ -258,18 +259,18 @@ class ExclusiveLock:
def break_lock(self):
if self.is_locked():
for name in os.listdir(self.path):
os.unlink(os.path.join(self.path, name))
os.rmdir(self.path)
for path_obj in self.path.iterdir():
path_obj.unlink()
self.path.rmdir()
def migrate_lock(self, old_id, new_id):
"""migrate the lock ownership from old_id to new_id"""
assert self.id == old_id
new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
new_unique_name = self.path / ("%s.%d-%x" % new_id)
if self.is_locked() and self.by_me():
with open(new_unique_name, "wb"):
with new_unique_name.open("wb"):
pass
os.unlink(self.unique_name)
self.unique_name.unlink()
self.id, self.unique_name = new_id, new_unique_name
@ -282,13 +283,14 @@ class LockRoster:
"""
def __init__(self, path, id=None):
assert isinstance(path, Path)
self.path = path
self.id = id or platform.get_process_id()
self.kill_stale_locks = True
def load(self):
try:
with open(self.path) as f:
with self.path.open() as f:
data = json.load(f)
# Just nuke the stale locks early on load
@ -313,12 +315,12 @@ class LockRoster:
return data
def save(self, data):
with open(self.path, "w") as f:
with self.path.open("w") as f:
json.dump(data, f)
def remove(self):
try:
os.unlink(self.path)
self.path.unlink()
except FileNotFoundError:
pass
@ -392,11 +394,11 @@ class Lock:
self.timeout = timeout
self.id = id or platform.get_process_id()
# globally keeping track of shared and exclusive lockers:
self._roster = LockRoster(path + ".roster", id=id)
self._roster = LockRoster(Path(path + ".roster"), id=id)
# an exclusive lock, used for:
# - holding while doing roster queries / updates
# - holding while the Lock itself is exclusive
self._lock = ExclusiveLock(path + ".exclusive", id=id, timeout=timeout)
self._lock = ExclusiveLock(str(Path(path + ".exclusive")), id=id, timeout=timeout)
def __enter__(self):
return self.acquire()

View file

@ -1,5 +1,6 @@
import random
import time
from pathlib import Path
from threading import Thread, Lock as ThreadingLock
from traceback import format_exc
@ -306,7 +307,7 @@ class TestLock:
@pytest.fixture()
def rosterpath(tmpdir):
return str(tmpdir.join("roster"))
return Path(tmpdir) / "roster"
class TestLockRoster: