This commit is contained in:
Yash Kaushik 2026-04-06 02:13:26 +00:00 committed by GitHub
commit c4fd5d917b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 301 additions and 129 deletions

View file

@ -1408,7 +1408,13 @@ class FilesystemObjectProcessors:
item.uid = uid
if gid is not None:
item.gid = gid
self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
self.print_file_status(None, path, phase="start")
try:
self.process_file_chunks(
item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
)
finally:
self.print_file_status(None, path, phase="end")
item.get_size(memorize=True)
self.stats.nfiles += 1
self.add_item(item, stats=self.stats)
@ -1475,17 +1481,21 @@ class FilesystemObjectProcessors:
# Only chunkify the file if needed
changed_while_backup = False
if "chunks" not in item:
start_reading = time.time_ns()
with backup_io("read"):
self.process_file_chunks(
item,
cache,
self.stats,
self.show_progress,
backup_io_iter(self.chunker.chunkify(None, fd)),
)
self.stats.chunking_time = self.chunker.chunking_time
end_reading = time.time_ns()
self.print_file_status(None, path, phase="start")
try:
start_reading = time.time_ns()
with backup_io("read"):
self.process_file_chunks(
item,
cache,
self.stats,
self.show_progress,
backup_io_iter(self.chunker.chunkify(None, fd)),
)
self.stats.chunking_time = self.chunker.chunking_time
end_reading = time.time_ns()
finally:
self.print_file_status(None, path, phase="end")
with backup_io("fstat2"):
st2 = os.fstat(fd)
if self.files_changed == "disabled" or is_special_file:

View file

@ -155,12 +155,34 @@ class Archiver(
msg, msgid, args, wc = cls.__doc__, cls.__qualname__, warning.args, warning.exit_code
self.print_warning(msg, *args, wc=wc, wt="curly", msgid=msgid)
def print_file_status(self, status, path):
# if we get called with status == None, the final file status was already printed
def print_file_status(self, status, path, *, phase=None, error=None):
# START lifecycle event
if self.output_list and self.log_json and phase == "start" and status is None:
json_data = {"type": "file_status", "phase": "start"}
json_data |= text_to_json("path", path)
if error is not None:
json_data["error"] = error
print(json.dumps(json_data), file=sys.stderr)
return
# END lifecycle event
if self.output_list and self.log_json and phase == "end" and status is None:
json_data = {"type": "file_status", "phase": "end"}
json_data |= text_to_json("path", path)
if error is not None:
json_data["error"] = error
print(json.dumps(json_data), file=sys.stderr)
return
# regular status event (A, M, U, -, d, s, etc.)
if self.output_list and status is not None and (self.output_filter is None or status in self.output_filter):
if self.log_json:
json_data = {"type": "file_status", "status": status}
json_data |= text_to_json("path", path)
if phase is not None:
json_data["phase"] = phase
if error is not None:
json_data["error"] = error
print(json.dumps(json_data), file=sys.stderr)
else:
logging.getLogger("borg.output.list").info("%1s %s", status, remove_surrogates(path))

View file

@ -90,7 +90,7 @@ class CreateMixIn:
raise Error(f"{path!r}: {e}")
else:
status = "+" # included
self.print_file_status(status, path)
self.print_file_status(status, path, phase="end")
elif args.paths_from_command or args.paths_from_shell_command or args.paths_from_stdin:
paths_sep = eval_escapes(args.paths_delimiter) if args.paths_delimiter is not None else "\n"
if args.paths_from_command or args.paths_from_shell_command:
@ -139,7 +139,7 @@ class CreateMixIn:
status = "E"
if status == "C":
self.print_warning_instance(FileChangedWarning(path))
self.print_file_status(status, path)
self.print_file_status(status, path, phase="end")
if not dry_run and status is not None:
fso.stats.files_stats[status] += 1
if args.paths_from_command or args.paths_from_shell_command:
@ -167,7 +167,7 @@ class CreateMixIn:
status = "E"
else:
status = "+" # included
self.print_file_status(status, path)
self.print_file_status(status, path, phase="end")
if not dry_run and status is not None:
fso.stats.files_stats[status] += 1
continue
@ -293,134 +293,147 @@ class CreateMixIn:
"""
Call the right method on the given FilesystemObjectProcessor.
"""
if dry_run:
return "+" # included
MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0")
for retry in range(MAX_RETRIES):
last_try = retry == MAX_RETRIES - 1
try:
if stat.S_ISREG(st.st_mode):
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
# Types not archived: no list start/end pair (matches prior behavior of no status line).
if stat.S_ISSOCK(st.st_mode):
return
elif stat.S_ISDOOR(st.st_mode):
return
elif stat.S_ISPORT(st.st_mode):
return
m = st.st_mode
if not (
stat.S_ISREG(m)
or stat.S_ISDIR(m)
or stat.S_ISLNK(m)
or stat.S_ISFIFO(m)
or stat.S_ISCHR(m)
or stat.S_ISBLK(m)
):
self.print_warning("Unknown file type: %s", path)
return
# Emit START once, before any processing, before the retry loop.
self.print_file_status(None, path, phase="start")
try:
MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0")
for retry in range(MAX_RETRIES):
last_try = retry == MAX_RETRIES - 1
try:
if stat.S_ISREG(st.st_mode):
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
last_try=last_try,
strip_prefix=strip_prefix,
)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st_target,
cache=cache,
flags=flags_special_follow,
last_try=last_try,
strip_prefix=strip_prefix,
)
else:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st_target,
st=st,
cache=cache,
flags=flags_special_follow,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
else:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c", strip_prefix=strip_prefix
)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b", strip_prefix=strip_prefix
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
else:
self.print_warning("Unknown file type: %s", path)
return
except BackupItemExcluded:
return "-"
except BackupError as err:
if isinstance(err, BackupOSError):
if err.errno in (errno.EPERM, errno.EACCES):
raise
sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2)
time.sleep(sleep_s)
if retry < MAX_RETRIES - 1:
logger.warning(
f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..."
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c", strip_prefix=strip_prefix
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b", strip_prefix=strip_prefix
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
elif stat.S_ISDOOR(st.st_mode):
# Ignore Solaris doors
return
elif stat.S_ISPORT(st.st_mode):
# Ignore Solaris event ports
return
else:
self.print_warning("Unknown file type: %s", path)
return
except BackupItemExcluded:
return "-"
except BackupError as err:
if isinstance(err, BackupOSError):
if err.errno in (errno.EPERM, errno.EACCES):
# Do not try again, such errors can not be fixed by retrying.
raise
# sleep a bit, so temporary problems might go away...
sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ...
time.sleep(sleep_s)
if retry < MAX_RETRIES - 1:
logger.warning(
f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..."
)
else:
# giving up with retries, error will be dealt with (logged) by upper error handler
raise
# we better do a fresh stat on the file, just to make sure to get the current file
# mode right (which could have changed due to a race condition and is important for
# dispatching) and also to get current inode number of that file.
with backup_io("stat"):
st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)
with backup_io("stat"):
st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)
finally:
# END is always emitted here — after ALL processing including chunked I/O,
# even on exception, even on retry exhaustion.
self.print_file_status(None, path, phase="end")
def _rec_walk(
self,

View file

@ -1155,6 +1155,133 @@ def test_create_with_compression_algorithms(archivers, request):
assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))
def test_file_status_phase_regular_file(archivers, request):
"""Test that start/end lifecycle events are emitted for regular files."""
archiver = request.getfixturevalue(archivers)
create_regular_file(archiver.input_path, "file1", size=1024 * 80)
cmd(archiver, "repo-create", RK_ENCRYPTION)
log = cmd(archiver, "create", "test", "input", "--log-json", "--list")
events = [json.loads(line) for line in log.splitlines() if line.startswith("{")]
file_events = [
e for e in events
if e.get("type") == "file_status" and e.get("path", "").endswith("file1")
]
phases = [e.get("phase") for e in file_events]
# start must come before end, and both must be present
assert "start" in phases
assert "end" in phases
assert phases.index("start") < phases.index("end")
# end must be the last event for this file
assert phases[-1] == "end"
def test_file_status_phase_symlink(archivers, request):
"""Test that start/end lifecycle events are emitted for symlinks."""
if not are_symlinks_supported():
pytest.skip("symlinks not supported")
archiver = request.getfixturevalue(archivers)
os.symlink("file1", os.path.join(archiver.input_path, "link1"))
create_regular_file(archiver.input_path, "file1", size=1024)
cmd(archiver, "repo-create", RK_ENCRYPTION)
log = cmd(archiver, "create", "test", "input", "--log-json", "--list")
events = [json.loads(line) for line in log.splitlines() if line.startswith("{")]
link_events = [
e for e in events
if e.get("type") == "file_status" and e.get("path", "").endswith("link1")
]
phases = [e.get("phase") for e in link_events]
assert "start" in phases
assert "end" in phases
assert phases.index("start") < phases.index("end")
def test_file_status_phase_read_special(archivers, request):
"""Test that start/end lifecycle events are emitted for --read-special paths.
This is the critical regression test: previously --read-special file types
(symlinks, FIFOs, char/block devices) did NOT emit start/end events.
"""
if not are_symlinks_supported():
pytest.skip("symlinks not supported")
archiver = request.getfixturevalue(archivers)
# create a regular file and a symlink pointing to it
create_regular_file(archiver.input_path, "target", size=1024)
os.symlink(
os.path.join(archiver.input_path, "target"),
os.path.join(archiver.input_path, "link_to_target")
)
cmd(archiver, "repo-create", RK_ENCRYPTION)
log = cmd(archiver, "create", "test", "input", "--read-special", "--log-json", "--list")
events = [json.loads(line) for line in log.splitlines() if line.startswith("{")]
link_events = [
e for e in events
if e.get("type") == "file_status" and e.get("path", "").endswith("link_to_target")
]
phases = [e.get("phase") for e in link_events]
# This would fail before the fix: --read-special symlinks got no start event
assert "start" in phases, "start event missing for --read-special symlink"
assert "end" in phases, "end event missing for --read-special symlink"
assert phases.index("start") < phases.index("end")
def test_file_status_phase_no_orphan_events(archivers, request):
"""Test that every start event has a matching end event and vice versa.
No file should have an end without a start (orphan end),
or a start without an end (orphan start).
"""
archiver = request.getfixturevalue(archivers)
create_regular_file(archiver.input_path, "file1", size=1024 * 80)
create_regular_file(archiver.input_path, "file2", size=1024 * 80)
create_regular_file(archiver.input_path, "dir1/file3", size=1024)
cmd(archiver, "repo-create", RK_ENCRYPTION)
log = cmd(archiver, "create", "test", "input", "--log-json", "--list")
events = [json.loads(line) for line in log.splitlines() if line.startswith("{")]
file_events = [e for e in events if e.get("type") == "file_status"]
# Group events by path
from collections import defaultdict
events_by_path = defaultdict(list)
for e in file_events:
path = e.get("path")
phase = e.get("phase")
if phase in ("start", "end"):
events_by_path[path].append(phase)
for path, phases in events_by_path.items():
starts = phases.count("start")
ends = phases.count("end")
assert starts == ends, (
f"{path}: mismatched lifecycle events — {starts} start(s), {ends} end(s)"
)
assert starts == 1, (
f"{path}: expected exactly 1 start/end pair, got {starts}"
)
assert phases[0] == "start", f"{path}: first event is not 'start'"
assert phases[-1] == "end", f"{path}: last event is not 'end'"
def test_file_status_phase_excluded_no_lifecycle(archivers, request):
"""Test that excluded files do not emit start/end lifecycle events."""
archiver = request.getfixturevalue(archivers)
create_regular_file(archiver.input_path, "file1", size=1024 * 80)
create_regular_file(archiver.input_path, "excluded_file", size=1024 * 80)
cmd(archiver, "repo-create", RK_ENCRYPTION)
log = cmd(
archiver, "create", "test", "input",
"--log-json", "--list", "--exclude", "*/excluded_file"
)
events = [json.loads(line) for line in log.splitlines() if line.startswith("{")]
excluded_events = [
e for e in events
if e.get("type") == "file_status" and e.get("path", "").endswith("excluded_file")
]
phases = [e.get("phase") for e in excluded_events]
# excluded files must not have lifecycle events
assert "start" not in phases, "excluded file should not emit start event"
assert "end" not in phases, "excluded file should not emit end event"
def test_exclude_nodump_dir_with_file(archivers, request):
"""A directory flagged NODUMP and its contents must not be archived."""
archiver = request.getfixturevalue(archivers)