From f15adc5dcf8ed5ed863f5ce42ce996940100f4d9 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 8 May 2026 18:40:51 +0200 Subject: [PATCH] extract: resolve memory leak on abandoned async requests in RemoteRepository When a generator for get_many() or call_many() is destroyed early (for example, if a BackupError occurs during extraction and aborts fetching preloaded chunks), a GeneratorExit is raised inside call_many(). Previously, call_many() lacked a try/finally block, so it failed to mark the abandoned msgids in self.ignore_responses. When the remote server eventually sent the data, it was indefinitely cached in self.responses and self.chunkid_to_msgids, causing a memory leak. This fix wraps the request loop in try/finally to guarantee that all pending waiting_for message IDs, as well as any unrequested preloaded chunk IDs in calls, are properly added to ignore_responses. For example, this memory leak could be triggered when extracting files: - by permission errors or other OSErrors with the extracted file - if the archived file had all-zero replacement chunks or inconsistent size --- src/borg/legacy/remote.py | 237 ++++++++++++++++++------------------ src/borg/remote.py | 245 ++++++++++++++++++++------------------ 2 files changed, 248 insertions(+), 234 deletions(-) diff --git a/src/borg/legacy/remote.py b/src/borg/legacy/remote.py index d94215a64..5c4eee6eb 100644 --- a/src/borg/legacy/remote.py +++ b/src/borg/legacy/remote.py @@ -506,128 +506,135 @@ class LegacyRemoteRepository: waiting_for = [] maximum_to_send = 0 if wait else self.upload_buffer_size_limit send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. - while wait or calls: - if self.shutdown_time and time.monotonic() > self.shutdown_time: - # we are shutting this LegacyRemoteRepository down already, make sure we do not waste - # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. - logger.debug( - "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", - len(waiting_for), - len(self.async_responses), - ) - return - while waiting_for: - try: - unpacked = self.responses.pop(waiting_for[0]) - waiting_for.pop(0) - handle_error(unpacked) - yield unpacked[RESULT] - if not waiting_for and not calls: - return - except KeyError: - break - if cmd == "async_responses": - while True: + try: + while wait or calls: + if self.shutdown_time and time.monotonic() > self.shutdown_time: + # we are shutting this LegacyRemoteRepository down already, make sure we do not waste + # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + logger.debug( + "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", + len(waiting_for), + len(self.async_responses), + ) + return + while waiting_for: try: - msgid, unpacked = self.async_responses.popitem() - except KeyError: - # there is nothing left what we already have received - if async_wait and self.ignore_responses: - # but do not return if we shall wait and there is something left to wait for: - break - else: - return - else: + unpacked = self.responses.pop(waiting_for[0]) + waiting_for.pop(0) handle_error(unpacked) yield unpacked[RESULT] - if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): - w_fds = [self.stdin_fd] - else: - w_fds = [] - r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) - if x: - raise Exception("FD exception occurred") - for fd in r: - if fd is self.stdout_fd: - data = os.read(fd, BUFSIZE) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - self.unpacker.feed(data) - for unpacked in self.unpacker: - if not isinstance(unpacked, dict): - raise UnexpectedRPCDataFormatFromServer(data) - - lr_dict = unpacked.get(LOG) - if lr_dict is not None: - # Re-emit remote log messages locally. - _logger = logging.getLogger(lr_dict["name"]) - if _logger.isEnabledFor(lr_dict["level"]): - _logger.handle(logging.LogRecord(**lr_dict)) - continue - - msgid = unpacked[MSGID] - if msgid in self.ignore_responses: - self.ignore_responses.remove(msgid) - # async methods never return values, but may raise exceptions. - if "exception_class" in unpacked: - self.async_responses[msgid] = unpacked + if not waiting_for and not calls: + return + except KeyError: + break + if cmd == "async_responses": + while True: + try: + msgid, unpacked = self.async_responses.popitem() + except KeyError: + # there is nothing left what we already have received + if async_wait and self.ignore_responses: + # but do not return if we shall wait and there is something left to wait for: + break else: - # we currently do not have async result values except "None", - # so we do not add them into async_responses. - if unpacked[RESULT] is not None: + return + else: + handle_error(unpacked) + yield unpacked[RESULT] + if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): + w_fds = [self.stdin_fd] + else: + w_fds = [] + r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) + if x: + raise Exception("FD exception occurred") + for fd in r: + if fd is self.stdout_fd: + data = os.read(fd, BUFSIZE) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + self.unpacker.feed(data) + for unpacked in self.unpacker: + if not isinstance(unpacked, dict): + raise UnexpectedRPCDataFormatFromServer(data) + + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] + if msgid in self.ignore_responses: + self.ignore_responses.remove(msgid) + # async methods never return values, but may raise exceptions. + if "exception_class" in unpacked: self.async_responses[msgid] = unpacked - else: - self.responses[msgid] = unpacked - elif fd is self.stderr_fd: - data = os.read(fd, 32768) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - # deal with incomplete lines (may appear due to block buffering) - if self.stderr_received: - data = self.stderr_received + data - self.stderr_received = b"" - lines = data.splitlines(keepends=True) - if lines and not lines[-1].endswith((b"\r", b"\n")): - self.stderr_received = lines.pop() - # now we have complete lines in and any partial line in self.stderr_received. - _logger = logging.getLogger() - for line in lines: - # borg serve (remote/server side) should not emit stuff on stderr, - # but e.g. the ssh process (local/client side) might output errors there. - assert line.endswith((b"\r", b"\n")) - # something came in on stderr, log it to not lose it. - # decode late, avoid partial utf-8 sequences. - _logger.warning("stderr: " + line.decode().strip()) - if w: - while ( - (len(self.to_send) <= maximum_to_send) - and (calls or self.preload_ids) - and len(waiting_for) < MAX_INFLIGHT - ): - if calls: - if is_preloaded: - assert cmd == "get", "is_preload is only supported for 'get'" - if calls[0]["id"] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(calls.pop(0)["id"])) - else: - args = calls.pop(0) - if cmd == "get" and args["id"] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(args["id"])) + else: + # we currently do not have async result values except "None", + # so we do not add them into async_responses. + if unpacked[RESULT] is not None: + self.async_responses[msgid] = unpacked else: - self.msgid += 1 - waiting_for.append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) - if not self.to_send and self.preload_ids: - chunk_id = self.preload_ids.pop(0) - args = {"id": chunk_id} - self.msgid += 1 - self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) + self.responses[msgid] = unpacked + elif fd is self.stderr_fd: + data = os.read(fd, 32768) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + # deal with incomplete lines (may appear due to block buffering) + if self.stderr_received: + data = self.stderr_received + data + self.stderr_received = b"" + lines = data.splitlines(keepends=True) + if lines and not lines[-1].endswith((b"\r", b"\n")): + self.stderr_received = lines.pop() + # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() + for line in lines: + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) + if w: + while ( + (len(self.to_send) <= maximum_to_send) + and (calls or self.preload_ids) + and len(waiting_for) < MAX_INFLIGHT + ): + if calls: + if is_preloaded: + assert cmd == "get", "is_preload is only supported for 'get'" + if calls[0]["id"] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(calls.pop(0)["id"])) + else: + args = calls.pop(0) + if cmd == "get" and args["id"] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(args["id"])) + else: + self.msgid += 1 + waiting_for.append(self.msgid) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) + if not self.to_send and self.preload_ids: + chunk_id = self.preload_ids.pop(0) + args = {"id": chunk_id} + self.msgid += 1 + self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) - send_buffer() - self.ignore_responses |= set(waiting_for) # we lose order here + send_buffer() + finally: + self.ignore_responses |= set(waiting_for) # we lose order here + if is_preloaded: + for call in calls: + chunkid = call["id"] + if chunkid in self.chunkid_to_msgids: + self.ignore_responses.add(pop_preload_msgid(chunkid)) @api(since=parse_version("1.0.0"), v1_or_v2={"since": parse_version("2.0.0b10"), "previously": True}) def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_or_v2=False): diff --git a/src/borg/remote.py b/src/borg/remote.py index b8ff6d3c3..5af3c41a1 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -818,133 +818,140 @@ class RemoteRepository: waiting_for = [] maximum_to_send = 0 if wait else self.upload_buffer_size_limit send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. - while wait or calls: - logger.debug( - f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}" - ) - if self.shutdown_time and time.monotonic() > self.shutdown_time: - # we are shutting this RemoteRepository down already, make sure we do not waste - # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + try: + while wait or calls: logger.debug( - "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", - len(waiting_for), - len(self.async_responses), + f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}" ) - return - while waiting_for: - try: - unpacked = self.responses.pop(waiting_for[0]) - waiting_for.pop(0) - handle_error(unpacked) - yield unpacked[RESULT] - if not waiting_for and not calls: - return - except KeyError: - break - if cmd == "async_responses": - while True: + if self.shutdown_time and time.monotonic() > self.shutdown_time: + # we are shutting this RemoteRepository down already, make sure we do not waste + # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + logger.debug( + "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", + len(waiting_for), + len(self.async_responses), + ) + return + while waiting_for: try: - msgid, unpacked = self.async_responses.popitem() - except KeyError: - # there is nothing left what we already have received - if async_wait and self.ignore_responses: - # but do not return if we shall wait and there is something left to wait for: - break - else: - return - else: + unpacked = self.responses.pop(waiting_for[0]) + waiting_for.pop(0) handle_error(unpacked) yield unpacked[RESULT] - if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): - w_fds = [self.stdin_fd] - else: - w_fds = [] - r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) - if x: - raise Exception("FD exception occurred") - for fd in r: - if fd is self.stdout_fd: - data = os.read(fd, BUFSIZE) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - self.unpacker.feed(data) - for unpacked in self.unpacker: - if not isinstance(unpacked, dict): - raise UnexpectedRPCDataFormatFromServer(data) - - lr_dict = unpacked.get(LOG) - if lr_dict is not None: - # Re-emit remote log messages locally. - _logger = logging.getLogger(lr_dict["name"]) - if _logger.isEnabledFor(lr_dict["level"]): - _logger.handle(logging.LogRecord(**lr_dict)) - continue - - msgid = unpacked[MSGID] - if msgid in self.ignore_responses: - self.ignore_responses.remove(msgid) - # async methods never return values, but may raise exceptions. - if "exception_class" in unpacked: - self.async_responses[msgid] = unpacked + if not waiting_for and not calls: + return + except KeyError: + break + if cmd == "async_responses": + while True: + try: + msgid, unpacked = self.async_responses.popitem() + except KeyError: + # there is nothing left what we already have received + if async_wait and self.ignore_responses: + # but do not return if we shall wait and there is something left to wait for: + break else: - # we currently do not have async result values except "None", - # so we do not add them into async_responses. - if unpacked[RESULT] is not None: - self.async_responses[msgid] = unpacked + return else: - self.responses[msgid] = unpacked - elif fd is self.stderr_fd: - data = os.read(fd, 32768) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - # deal with incomplete lines (may appear due to block buffering) - if self.stderr_received: - data = self.stderr_received + data - self.stderr_received = b"" - lines = data.splitlines(keepends=True) - if lines and not lines[-1].endswith((b"\r", b"\n")): - self.stderr_received = lines.pop() - # now we have complete lines in and any partial line in self.stderr_received. - _logger = logging.getLogger() - for line in lines: - # borg serve (remote/server side) should not emit stuff on stderr, - # but e.g. the ssh process (local/client side) might output errors there. - assert line.endswith((b"\r", b"\n")) - # something came in on stderr, log it to not lose it. - # decode late, avoid partial utf-8 sequences. - _logger.warning("stderr: " + line.decode().strip()) - if w: - while ( - (len(self.to_send) <= maximum_to_send) - and (calls or self.preload_ids) - and len(waiting_for) < MAX_INFLIGHT - ): - if calls: - args = calls[0] - if cmd == "get" and args["id"] in self.chunkid_to_msgids: - # we have a get command and have already sent a request for this chunkid when - # doing preloading, so we know the msgid of the response we are waiting for: - waiting_for.append(pop_preload_msgid(args["id"])) - del calls[0] - elif not is_preloaded: - # make and send a request (already done if we are using preloading) - self.msgid += 1 - waiting_for.append(self.msgid) - del calls[0] - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) - if not self.to_send and self.preload_ids: - chunk_id = self.preload_ids.pop(0) - # for preloading chunks, the raise_missing behaviour is defined HERE, - # not in the get_many / fetch_many call that later fetches the preloaded chunks. - args = {"id": chunk_id, "raise_missing": False} - self.msgid += 1 - self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) + handle_error(unpacked) + yield unpacked[RESULT] + if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): + w_fds = [self.stdin_fd] + else: + w_fds = [] + r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) + if x: + raise Exception("FD exception occurred") + for fd in r: + if fd is self.stdout_fd: + data = os.read(fd, BUFSIZE) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + self.unpacker.feed(data) + for unpacked in self.unpacker: + if not isinstance(unpacked, dict): + raise UnexpectedRPCDataFormatFromServer(data) - send_buffer() - self.ignore_responses |= set(waiting_for) # we lose order here + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] + if msgid in self.ignore_responses: + self.ignore_responses.remove(msgid) + # async methods never return values, but may raise exceptions. + if "exception_class" in unpacked: + self.async_responses[msgid] = unpacked + else: + # we currently do not have async result values except "None", + # so we do not add them into async_responses. + if unpacked[RESULT] is not None: + self.async_responses[msgid] = unpacked + else: + self.responses[msgid] = unpacked + elif fd is self.stderr_fd: + data = os.read(fd, 32768) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + # deal with incomplete lines (may appear due to block buffering) + if self.stderr_received: + data = self.stderr_received + data + self.stderr_received = b"" + lines = data.splitlines(keepends=True) + if lines and not lines[-1].endswith((b"\r", b"\n")): + self.stderr_received = lines.pop() + # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() + for line in lines: + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) + if w: + while ( + (len(self.to_send) <= maximum_to_send) + and (calls or self.preload_ids) + and len(waiting_for) < MAX_INFLIGHT + ): + if calls: + args = calls[0] + if cmd == "get" and args["id"] in self.chunkid_to_msgids: + # we have a get command and have already sent a request for this chunkid when + # doing preloading, so we know the msgid of the response we are waiting for: + waiting_for.append(pop_preload_msgid(args["id"])) + del calls[0] + elif not is_preloaded: + # make and send a request (already done if we are using preloading) + self.msgid += 1 + waiting_for.append(self.msgid) + del calls[0] + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) + if not self.to_send and self.preload_ids: + chunk_id = self.preload_ids.pop(0) + # for preloading chunks, the raise_missing behaviour is defined HERE, + # not in the get_many / fetch_many call that later fetches the preloaded chunks. + args = {"id": chunk_id, "raise_missing": False} + self.msgid += 1 + self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) + + send_buffer() + finally: + self.ignore_responses |= set(waiting_for) # we lose order here + if is_preloaded: + for call in calls: + chunkid = call["id"] + if chunkid in self.chunkid_to_msgids: + self.ignore_responses.add(pop_preload_msgid(chunkid)) @api(since=parse_version("1.0.0"), v1_or_v2={"since": parse_version("2.0.0b9"), "previously": True}) def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_or_v2=False):