mirror of
https://github.com/borgbackup/borg.git
synced 2026-05-23 18:45:53 -04:00
Some checks failed
Lint / lint (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / security (push) Has been cancelled
CodeQL / Analyze (push) Has been cancelled
CI / asan_ubsan (push) Has been cancelled
CI / native_tests (push) Has been cancelled
CI / vm_tests (Haiku, false, haiku, r1beta5) (push) Has been cancelled
CI / vm_tests (NetBSD, false, netbsd, 10.1) (push) Has been cancelled
CI / vm_tests (OmniOS, false, omnios, r151056) (push) Has been cancelled
CI / vm_tests (OpenBSD, false, openbsd, 7.7) (push) Has been cancelled
CI / vm_tests (borg-freebsd-14-x86_64-gh, FreeBSD, true, freebsd, 14.3) (push) Has been cancelled
CI / windows_tests (push) Has been cancelled
271 lines
12 KiB
Python
271 lines
12 KiB
Python
import datetime
|
|
import json
|
|
import random
|
|
import time
|
|
|
|
from xxhash import xxh64
|
|
|
|
from borgstore.store import ObjectNotFound
|
|
|
|
from . import platform
|
|
from .helpers import Error, ErrorWithTraceback
|
|
from .logger import create_logger
|
|
|
|
logger = create_logger(__name__)
|
|
|
|
|
|
class LockError(Error):
|
|
"""Failed to acquire the lock {}."""
|
|
|
|
exit_mcode = 70
|
|
|
|
|
|
class LockErrorT(ErrorWithTraceback):
|
|
"""Failed to acquire the lock {}."""
|
|
|
|
exit_mcode = 71
|
|
|
|
|
|
class LockFailed(LockErrorT):
|
|
"""Failed to create/acquire the lock {} ({})."""
|
|
|
|
exit_mcode = 72
|
|
|
|
|
|
class LockTimeout(LockError):
|
|
"""Failed to create/acquire the lock {} (timeout)."""
|
|
|
|
exit_mcode = 73
|
|
|
|
|
|
class NotLocked(LockErrorT):
|
|
"""Failed to release the lock {} (was not locked)."""
|
|
|
|
exit_mcode = 74
|
|
|
|
|
|
class NotMyLock(LockErrorT):
|
|
"""Failed to release the lock {} (was/is locked, but not by me)."""
|
|
|
|
exit_mcode = 75
|
|
|
|
|
|
class Lock:
|
|
"""
|
|
A lock for a resource that can be accessed in a shared or exclusive way.
|
|
|
|
Typically, write access to a resource needs an exclusive lock (one writer,
|
|
no readers allowed), and read access to a resource needs a shared lock
|
|
(multiple readers are allowed).
|
|
|
|
If possible, use the context manager form::
|
|
|
|
with Lock(...) as lock:
|
|
...
|
|
|
|
This ensures the lock is released when the block is exited, no matter how
|
|
(e.g., if an exception occurs).
|
|
"""
|
|
|
|
def __init__(self, store, exclusive=False, sleep=None, timeout=1.0, stale=30 * 60, id=None):
|
|
self.store = store
|
|
self.is_exclusive = exclusive
|
|
self.sleep = sleep
|
|
self.timeout = timeout
|
|
self.race_recheck_delay = 0.01 # local: 0.01, network/slow remote: >= 1.0
|
|
self.other_locks_go_away_delay = 0.1 # local: 0.1, network/slow remote: >= 1.0
|
|
self.retry_delay_min = 1.0
|
|
self.retry_delay_max = 5.0
|
|
self.stale_td = datetime.timedelta(seconds=stale) # ignore/delete it if older
|
|
self.refresh_td = datetime.timedelta(seconds=stale // 2) # don't refresh it if younger
|
|
self.last_refresh_dt = None
|
|
self.id = id or platform.get_process_id()
|
|
assert len(self.id) == 3
|
|
logger.debug(f"LOCK-INIT: initializing. store: {store}, stale: {stale}s, refresh: {stale // 2}s.")
|
|
|
|
def __enter__(self):
|
|
return self.acquire()
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
ignore_not_found = exc_type is not None
|
|
# if there was an exception, try to release the lock,
|
|
# but don't raise another exception while trying if it was not there.
|
|
self.release(ignore_not_found=ignore_not_found)
|
|
|
|
def __repr__(self):
|
|
return f"<{self.__class__.__name__}: {self.id!r}>"
|
|
|
|
def _create_lock(self, *, exclusive=None, update_last_refresh=False):
|
|
assert exclusive is not None
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
timestamp = now.isoformat(timespec="milliseconds")
|
|
lock = dict(exclusive=exclusive, hostid=self.id[0], processid=self.id[1], threadid=self.id[2], time=timestamp)
|
|
value = json.dumps(lock).encode("utf-8")
|
|
key = xxh64(value).hexdigest()
|
|
logger.debug(f"LOCK-CREATE: creating lock in store. key: {key}, lock: {lock}.")
|
|
self.store.store(f"locks/{key}", value)
|
|
if update_last_refresh:
|
|
# we parse the timestamp string to get *precisely* the datetime in the lock:
|
|
self.last_refresh_dt = datetime.datetime.fromisoformat(timestamp)
|
|
return key
|
|
|
|
def _delete_lock(self, key, *, ignore_not_found=False, update_last_refresh=False):
|
|
logger.debug(f"LOCK-DELETE: deleting lock from store. key: {key}.")
|
|
try:
|
|
self.store.delete(f"locks/{key}")
|
|
except ObjectNotFound:
|
|
if not ignore_not_found:
|
|
raise
|
|
finally:
|
|
if update_last_refresh:
|
|
self.last_refresh_dt = None
|
|
|
|
def _is_our_lock(self, lock):
|
|
return self.id == (lock["hostid"], lock["processid"], lock["threadid"])
|
|
|
|
def _is_stale_lock(self, lock):
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
if now > lock["dt"] + self.stale_td:
|
|
logger.debug(f"LOCK-STALE: lock is too old, it was not refreshed. lock: {lock}.")
|
|
return True
|
|
if not platform.process_alive(lock["hostid"], lock["processid"], lock["threadid"]):
|
|
logger.debug(f"LOCK-STALE: we KNOW that the lock-owning process is dead. lock: {lock}.")
|
|
return True
|
|
return False
|
|
|
|
def _get_locks(self):
|
|
locks = {}
|
|
try:
|
|
infos = list(self.store.list("locks"))
|
|
except ObjectNotFound:
|
|
return {}
|
|
for info in infos:
|
|
key = info.name
|
|
content = self.store.load(f"locks/{key}")
|
|
lock = json.loads(content.decode("utf-8"))
|
|
lock["key"] = key
|
|
lock["dt"] = datetime.datetime.fromisoformat(lock["time"])
|
|
if self._is_stale_lock(lock):
|
|
# ignore it and delete it (even if it is not from us)
|
|
self._delete_lock(key, ignore_not_found=True, update_last_refresh=self._is_our_lock(lock))
|
|
else:
|
|
locks[key] = lock
|
|
return locks
|
|
|
|
def _find_locks(self, *, only_exclusive=False, only_mine=False):
|
|
locks = self._get_locks()
|
|
found_locks = []
|
|
for key in locks:
|
|
lock = locks[key]
|
|
if (not only_exclusive or lock["exclusive"]) and (
|
|
not only_mine or (lock["hostid"], lock["processid"], lock["threadid"]) == self.id
|
|
):
|
|
found_locks.append(lock)
|
|
return found_locks
|
|
|
|
def acquire(self):
|
|
# goal
|
|
# for exclusive lock: there must be only 1 exclusive lock and no other (exclusive or non-exclusive) locks.
|
|
# for non-exclusive lock: there can be multiple n-e locks, but there must not exist an exclusive lock.
|
|
logger.debug(f"LOCK-ACQUIRE: trying to acquire a lock. exclusive: {self.is_exclusive}.")
|
|
started = time.monotonic()
|
|
while time.monotonic() - started < self.timeout:
|
|
exclusive_locks = self._find_locks(only_exclusive=True)
|
|
if len(exclusive_locks) == 0:
|
|
# looks like there are no exclusive locks, create our lock.
|
|
key = self._create_lock(exclusive=self.is_exclusive, update_last_refresh=True)
|
|
# obviously we have a race condition here: other client(s) might have created exclusive
|
|
# lock(s) at the same time in parallel. thus we have to check again.
|
|
time.sleep(
|
|
self.race_recheck_delay
|
|
) # give other clients time to notice our exclusive lock, stop creating theirs
|
|
exclusive_locks = self._find_locks(only_exclusive=True)
|
|
if self.is_exclusive:
|
|
if len(exclusive_locks) == 1 and exclusive_locks[0]["key"] == key:
|
|
logger.debug("LOCK-ACQUIRE: we are the only exclusive lock!")
|
|
while time.monotonic() - started < self.timeout:
|
|
locks = self._find_locks(only_exclusive=False)
|
|
if len(locks) == 1 and locks[0]["key"] == key:
|
|
logger.debug("LOCK-ACQUIRE: success! no non-exclusive locks are left!")
|
|
return self
|
|
time.sleep(self.other_locks_go_away_delay)
|
|
logger.debug("LOCK-ACQUIRE: timeout while waiting for non-exclusive locks to go away.")
|
|
break # timeout
|
|
else:
|
|
logger.debug("LOCK-ACQUIRE: someone else also created an exclusive lock, deleting ours.")
|
|
self._delete_lock(key, ignore_not_found=True, update_last_refresh=True)
|
|
else: # not is_exclusive
|
|
if len(exclusive_locks) == 0:
|
|
logger.debug("LOCK-ACQUIRE: success! no exclusive locks detected.")
|
|
# We don't care for other non-exclusive locks.
|
|
return self
|
|
else:
|
|
logger.debug("LOCK-ACQUIRE: exclusive locks detected, deleting our shared lock.")
|
|
self._delete_lock(key, ignore_not_found=True, update_last_refresh=True)
|
|
# wait a random bit before retrying
|
|
time.sleep(
|
|
self.retry_delay_min + (self.retry_delay_max - self.retry_delay_min) * random.random() # nosec B311
|
|
)
|
|
logger.debug("LOCK-ACQUIRE: timeout while trying to acquire a lock.")
|
|
raise LockTimeout(str(self.store))
|
|
|
|
def release(self, *, ignore_not_found=False):
|
|
self.last_refresh_dt = None
|
|
locks = self._find_locks(only_mine=True)
|
|
if not locks:
|
|
if ignore_not_found:
|
|
logger.debug("LOCK-RELEASE: trying to release the lock, but none was found.")
|
|
return
|
|
else:
|
|
raise NotLocked(str(self.store))
|
|
assert len(locks) == 1
|
|
lock = locks[0]
|
|
logger.debug(f"LOCK-RELEASE: releasing lock: {lock}.")
|
|
self._delete_lock(lock["key"], ignore_not_found=True, update_last_refresh=True)
|
|
|
|
def got_exclusive_lock(self):
|
|
locks = self._find_locks(only_mine=True, only_exclusive=True)
|
|
return len(locks) == 1
|
|
|
|
def break_lock(self):
|
|
"""Breaks all locks (not just ours)."""
|
|
logger.debug("LOCK-BREAK: break_lock() was called - deleting ALL locks!")
|
|
locks = self._get_locks()
|
|
for key in locks:
|
|
self._delete_lock(key, ignore_not_found=True)
|
|
self.last_refresh_dt = None
|
|
|
|
def migrate_lock(self, old_id, new_id):
|
|
"""Migrates the lock ownership from old_id to new_id."""
|
|
logger.debug(f"LOCK-MIGRATE: {old_id} -> {new_id}.")
|
|
assert self.id == old_id
|
|
assert len(new_id) == 3
|
|
old_locks = self._find_locks(only_mine=True)
|
|
assert len(old_locks) == 1
|
|
self.id = new_id
|
|
self._create_lock(exclusive=old_locks[0]["exclusive"], update_last_refresh=True)
|
|
self._delete_lock(old_locks[0]["key"], update_last_refresh=False)
|
|
|
|
def refresh(self):
|
|
"""Refreshes the lock; call this frequently, but not later than every <stale> seconds."""
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
if self.last_refresh_dt is not None and now > self.last_refresh_dt + self.refresh_td:
|
|
old_locks = self._find_locks(only_mine=True)
|
|
if len(old_locks) == 0:
|
|
# crap, my lock has been removed. :-(
|
|
# this can happen e.g. if my machine has been suspended while doing a backup, so that the
|
|
# lock will auto-expire. a borg client on another machine might then kill that lock.
|
|
# if my machine then wakes up again, the lock will have vanished and we get here.
|
|
# in this case, we need to abort the operation, because the other borg might have removed
|
|
# repo objects we have written, but the referential tree was not yet full present, e.g.
|
|
# no archive has been added yet to the manifest, thus all objects looked unused/orphaned.
|
|
# another scenario when this can happen is a careless user running break-lock on another
|
|
# machine without making sure there is no borg activity in that repo.
|
|
logger.debug("LOCK-REFRESH: our lock was killed, there is no safe way to continue.")
|
|
raise LockTimeout(str(self.store))
|
|
assert len(old_locks) == 1 # there shouldn't be more than 1
|
|
old_lock = old_locks[0]
|
|
if now > old_lock["dt"] + self.refresh_td:
|
|
logger.debug(f"LOCK-REFRESH: lock needs a refresh. lock: {old_lock}.")
|
|
self._create_lock(exclusive=old_lock["exclusive"], update_last_refresh=True)
|
|
self._delete_lock(old_lock["key"], update_last_refresh=False)
|