extract: resolve memory leak on abandoned async requests in RemoteRepository

When a generator for get_many() or call_many() is destroyed early (for example, if a BackupError occurs during extraction and aborts fetching preloaded chunks), a GeneratorExit is raised inside call_many().

Previously, call_many() lacked a try/finally block, so it failed to mark the abandoned msgids in self.ignore_responses. When the remote server eventually sent the data, it was indefinitely cached in self.responses and self.chunkid_to_msgids, causing a memory leak.

This fix wraps the request loop in try/finally to guarantee that all pending waiting_for message IDs, as well as any unrequested preloaded chunk IDs in calls, are properly added to ignore_responses.

For example, this memory leak could be triggered when extracting files:
- by permission errors or other OSErrors with the extracted file
- if the archived file had all-zero replacement chunks or inconsistent size
This commit is contained in:
Thomas Waldmann 2026-05-08 18:40:51 +02:00
parent c409e767b9
commit f15adc5dcf
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 248 additions and 234 deletions

View file

@ -506,128 +506,135 @@ class LegacyRemoteRepository:
waiting_for = []
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
while wait or calls:
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this LegacyRemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
logger.debug(
"shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
len(waiting_for),
len(self.async_responses),
)
return
while waiting_for:
try:
unpacked = self.responses.pop(waiting_for[0])
waiting_for.pop(0)
handle_error(unpacked)
yield unpacked[RESULT]
if not waiting_for and not calls:
return
except KeyError:
break
if cmd == "async_responses":
while True:
try:
while wait or calls:
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this LegacyRemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
logger.debug(
"shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
len(waiting_for),
len(self.async_responses),
)
return
while waiting_for:
try:
msgid, unpacked = self.async_responses.popitem()
except KeyError:
# there is nothing left what we already have received
if async_wait and self.ignore_responses:
# but do not return if we shall wait and there is something left to wait for:
break
else:
return
else:
unpacked = self.responses.pop(waiting_for[0])
waiting_for.pop(0)
handle_error(unpacked)
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
w_fds = []
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
if x:
raise Exception("FD exception occurred")
for fd in r:
if fd is self.stdout_fd:
data = os.read(fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not isinstance(unpacked, dict):
raise UnexpectedRPCDataFormatFromServer(data)
lr_dict = unpacked.get(LOG)
if lr_dict is not None:
# Re-emit remote log messages locally.
_logger = logging.getLogger(lr_dict["name"])
if _logger.isEnabledFor(lr_dict["level"]):
_logger.handle(logging.LogRecord(**lr_dict))
continue
msgid = unpacked[MSGID]
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if "exception_class" in unpacked:
self.async_responses[msgid] = unpacked
if not waiting_for and not calls:
return
except KeyError:
break
if cmd == "async_responses":
while True:
try:
msgid, unpacked = self.async_responses.popitem()
except KeyError:
# there is nothing left what we already have received
if async_wait and self.ignore_responses:
# but do not return if we shall wait and there is something left to wait for:
break
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULT] is not None:
return
else:
handle_error(unpacked)
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
w_fds = []
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
if x:
raise Exception("FD exception occurred")
for fd in r:
if fd is self.stdout_fd:
data = os.read(fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not isinstance(unpacked, dict):
raise UnexpectedRPCDataFormatFromServer(data)
lr_dict = unpacked.get(LOG)
if lr_dict is not None:
# Re-emit remote log messages locally.
_logger = logging.getLogger(lr_dict["name"])
if _logger.isEnabledFor(lr_dict["level"]):
_logger.handle(logging.LogRecord(**lr_dict))
continue
msgid = unpacked[MSGID]
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if "exception_class" in unpacked:
self.async_responses[msgid] = unpacked
else:
self.responses[msgid] = unpacked
elif fd is self.stderr_fd:
data = os.read(fd, 32768)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
# deal with incomplete lines (may appear due to block buffering)
if self.stderr_received:
data = self.stderr_received + data
self.stderr_received = b""
lines = data.splitlines(keepends=True)
if lines and not lines[-1].endswith((b"\r", b"\n")):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
_logger = logging.getLogger()
for line in lines:
# borg serve (remote/server side) should not emit stuff on stderr,
# but e.g. the ssh process (local/client side) might output errors there.
assert line.endswith((b"\r", b"\n"))
# something came in on stderr, log it to not lose it.
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
and (calls or self.preload_ids)
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
if is_preloaded:
assert cmd == "get", "is_preload is only supported for 'get'"
if calls[0]["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
else:
args = calls.pop(0)
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(args["id"]))
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULT] is not None:
self.async_responses[msgid] = unpacked
else:
self.msgid += 1
waiting_for.append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
args = {"id": chunk_id}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
self.responses[msgid] = unpacked
elif fd is self.stderr_fd:
data = os.read(fd, 32768)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
# deal with incomplete lines (may appear due to block buffering)
if self.stderr_received:
data = self.stderr_received + data
self.stderr_received = b""
lines = data.splitlines(keepends=True)
if lines and not lines[-1].endswith((b"\r", b"\n")):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
_logger = logging.getLogger()
for line in lines:
# borg serve (remote/server side) should not emit stuff on stderr,
# but e.g. the ssh process (local/client side) might output errors there.
assert line.endswith((b"\r", b"\n"))
# something came in on stderr, log it to not lose it.
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
and (calls or self.preload_ids)
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
if is_preloaded:
assert cmd == "get", "is_preload is only supported for 'get'"
if calls[0]["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
else:
args = calls.pop(0)
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(args["id"]))
else:
self.msgid += 1
waiting_for.append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
args = {"id": chunk_id}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
send_buffer()
self.ignore_responses |= set(waiting_for) # we lose order here
send_buffer()
finally:
self.ignore_responses |= set(waiting_for) # we lose order here
if is_preloaded:
for call in calls:
chunkid = call["id"]
if chunkid in self.chunkid_to_msgids:
self.ignore_responses.add(pop_preload_msgid(chunkid))
@api(since=parse_version("1.0.0"), v1_or_v2={"since": parse_version("2.0.0b10"), "previously": True})
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_or_v2=False):

View file

@ -818,133 +818,140 @@ class RemoteRepository:
waiting_for = []
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
while wait or calls:
logger.debug(
f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}"
)
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this RemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
try:
while wait or calls:
logger.debug(
"shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
len(waiting_for),
len(self.async_responses),
f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}"
)
return
while waiting_for:
try:
unpacked = self.responses.pop(waiting_for[0])
waiting_for.pop(0)
handle_error(unpacked)
yield unpacked[RESULT]
if not waiting_for and not calls:
return
except KeyError:
break
if cmd == "async_responses":
while True:
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this RemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
logger.debug(
"shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
len(waiting_for),
len(self.async_responses),
)
return
while waiting_for:
try:
msgid, unpacked = self.async_responses.popitem()
except KeyError:
# there is nothing left what we already have received
if async_wait and self.ignore_responses:
# but do not return if we shall wait and there is something left to wait for:
break
else:
return
else:
unpacked = self.responses.pop(waiting_for[0])
waiting_for.pop(0)
handle_error(unpacked)
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
w_fds = []
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
if x:
raise Exception("FD exception occurred")
for fd in r:
if fd is self.stdout_fd:
data = os.read(fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not isinstance(unpacked, dict):
raise UnexpectedRPCDataFormatFromServer(data)
lr_dict = unpacked.get(LOG)
if lr_dict is not None:
# Re-emit remote log messages locally.
_logger = logging.getLogger(lr_dict["name"])
if _logger.isEnabledFor(lr_dict["level"]):
_logger.handle(logging.LogRecord(**lr_dict))
continue
msgid = unpacked[MSGID]
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if "exception_class" in unpacked:
self.async_responses[msgid] = unpacked
if not waiting_for and not calls:
return
except KeyError:
break
if cmd == "async_responses":
while True:
try:
msgid, unpacked = self.async_responses.popitem()
except KeyError:
# there is nothing left what we already have received
if async_wait and self.ignore_responses:
# but do not return if we shall wait and there is something left to wait for:
break
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULT] is not None:
self.async_responses[msgid] = unpacked
return
else:
self.responses[msgid] = unpacked
elif fd is self.stderr_fd:
data = os.read(fd, 32768)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
# deal with incomplete lines (may appear due to block buffering)
if self.stderr_received:
data = self.stderr_received + data
self.stderr_received = b""
lines = data.splitlines(keepends=True)
if lines and not lines[-1].endswith((b"\r", b"\n")):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
_logger = logging.getLogger()
for line in lines:
# borg serve (remote/server side) should not emit stuff on stderr,
# but e.g. the ssh process (local/client side) might output errors there.
assert line.endswith((b"\r", b"\n"))
# something came in on stderr, log it to not lose it.
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
and (calls or self.preload_ids)
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
args = calls[0]
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
# we have a get command and have already sent a request for this chunkid when
# doing preloading, so we know the msgid of the response we are waiting for:
waiting_for.append(pop_preload_msgid(args["id"]))
del calls[0]
elif not is_preloaded:
# make and send a request (already done if we are using preloading)
self.msgid += 1
waiting_for.append(self.msgid)
del calls[0]
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
# for preloading chunks, the raise_missing behaviour is defined HERE,
# not in the get_many / fetch_many call that later fetches the preloaded chunks.
args = {"id": chunk_id, "raise_missing": False}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
handle_error(unpacked)
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
w_fds = []
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
if x:
raise Exception("FD exception occurred")
for fd in r:
if fd is self.stdout_fd:
data = os.read(fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not isinstance(unpacked, dict):
raise UnexpectedRPCDataFormatFromServer(data)
send_buffer()
self.ignore_responses |= set(waiting_for) # we lose order here
lr_dict = unpacked.get(LOG)
if lr_dict is not None:
# Re-emit remote log messages locally.
_logger = logging.getLogger(lr_dict["name"])
if _logger.isEnabledFor(lr_dict["level"]):
_logger.handle(logging.LogRecord(**lr_dict))
continue
msgid = unpacked[MSGID]
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if "exception_class" in unpacked:
self.async_responses[msgid] = unpacked
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULT] is not None:
self.async_responses[msgid] = unpacked
else:
self.responses[msgid] = unpacked
elif fd is self.stderr_fd:
data = os.read(fd, 32768)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
# deal with incomplete lines (may appear due to block buffering)
if self.stderr_received:
data = self.stderr_received + data
self.stderr_received = b""
lines = data.splitlines(keepends=True)
if lines and not lines[-1].endswith((b"\r", b"\n")):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
_logger = logging.getLogger()
for line in lines:
# borg serve (remote/server side) should not emit stuff on stderr,
# but e.g. the ssh process (local/client side) might output errors there.
assert line.endswith((b"\r", b"\n"))
# something came in on stderr, log it to not lose it.
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
and (calls or self.preload_ids)
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
args = calls[0]
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
# we have a get command and have already sent a request for this chunkid when
# doing preloading, so we know the msgid of the response we are waiting for:
waiting_for.append(pop_preload_msgid(args["id"]))
del calls[0]
elif not is_preloaded:
# make and send a request (already done if we are using preloading)
self.msgid += 1
waiting_for.append(self.msgid)
del calls[0]
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
# for preloading chunks, the raise_missing behaviour is defined HERE,
# not in the get_many / fetch_many call that later fetches the preloaded chunks.
args = {"id": chunk_id, "raise_missing": False}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
send_buffer()
finally:
self.ignore_responses |= set(waiting_for) # we lose order here
if is_preloaded:
for call in calls:
chunkid = call["id"]
if chunkid in self.chunkid_to_msgids:
self.ignore_responses.add(pop_preload_msgid(chunkid))
@api(since=parse_version("1.0.0"), v1_or_v2={"since": parse_version("2.0.0b9"), "previously": True})
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_or_v2=False):