diff --git a/src/borg/remote.py b/src/borg/remote.py index 2e20190db..51820b1a2 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -834,125 +834,132 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. waiting_for = [] maximum_to_send = 0 if wait else self.upload_buffer_size_limit send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. - while wait or calls: - if self.shutdown_time and time.monotonic() > self.shutdown_time: - # we are shutting this RemoteRepository down already, make sure we do not waste - # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. - logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.', - len(waiting_for), len(self.async_responses)) - return - while waiting_for: - try: - unpacked = self.responses.pop(waiting_for[0]) - waiting_for.pop(0) - if b'exception_class' in unpacked: - handle_error(unpacked) - else: - yield unpacked[RESULT] - if not waiting_for and not calls: - return - except KeyError: - break - if cmd == 'async_responses': - while True: + try: + while wait or calls: + if self.shutdown_time and time.monotonic() > self.shutdown_time: + # we are shutting this RemoteRepository down already, make sure we do not waste + # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.', + len(waiting_for), len(self.async_responses)) + return + while waiting_for: try: - msgid, unpacked = self.async_responses.popitem() - except KeyError: - # there is nothing left what we already have received - if async_wait and self.ignore_responses: - # but do not return if we shall wait and there is something left to wait for: - break - else: - return - else: + unpacked = self.responses.pop(waiting_for[0]) + waiting_for.pop(0) if b'exception_class' in unpacked: handle_error(unpacked) else: yield unpacked[RESULT] - if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): - w_fds = [self.stdin_fd] - else: - w_fds = [] - r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) - if x: - raise Exception('FD exception occurred') - for fd in r: - if fd is self.stdout_fd: - data = os.read(fd, BUFSIZE) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - self.unpacker.feed(data) - for unpacked in self.unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - elif isinstance(unpacked, tuple) and len(unpacked) == 4: - # The first field 'type' was always 1 and has always been ignored - _, msgid, error, res = unpacked - if error: - # ignore res, because it is only a fixed string anyway. - unpacked = {MSGID: msgid, b'exception_class': error} + if not waiting_for and not calls: + return + except KeyError: + break + if cmd == 'async_responses': + while True: + try: + msgid, unpacked = self.async_responses.popitem() + except KeyError: + # there is nothing left what we already have received + if async_wait and self.ignore_responses: + # but do not return if we shall wait and there is something left to wait for: + break else: - unpacked = {MSGID: msgid, RESULT: res} + return else: - raise UnexpectedRPCDataFormatFromServer(data) - if msgid in self.ignore_responses: - self.ignore_responses.remove(msgid) - # async methods never return values, but may raise exceptions. if b'exception_class' in unpacked: - self.async_responses[msgid] = unpacked + handle_error(unpacked) else: - # we currently do not have async result values except "None", - # so we do not add them into async_responses. - if unpacked[RESULT] is not None: - self.async_responses[msgid] = unpacked - else: - self.responses[msgid] = unpacked - elif fd is self.stderr_fd: - data = os.read(fd, 32768) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - # deal with incomplete lines (may appear due to block buffering) - if self.stderr_received: - data = self.stderr_received + data - self.stderr_received = b'' - lines = data.splitlines(keepends=True) - if lines and not lines[-1].endswith((b'\r', b'\n')): - self.stderr_received = lines.pop() - # now we have complete lines in and any partial line in self.stderr_received. - for line in lines: - handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences - if w: - while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT: - if calls: - if is_preloaded: - assert cmd == 'get', "is_preload is only supported for 'get'" - if calls[0]['id'] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(calls.pop(0)['id'])) - else: - args = calls.pop(0) - if cmd == 'get' and args['id'] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(args['id'])) - else: - self.msgid += 1 - waiting_for.append(self.msgid) - if self.dictFormat: - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) + yield unpacked[RESULT] + if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): + w_fds = [self.stdin_fd] + else: + w_fds = [] + r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) + if x: + raise Exception('FD exception occurred') + for fd in r: + if fd is self.stdout_fd: + data = os.read(fd, BUFSIZE) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + self.unpacker.feed(data) + for unpacked in self.unpacker: + if isinstance(unpacked, dict): + msgid = unpacked[MSGID] + elif isinstance(unpacked, tuple) and len(unpacked) == 4: + # The first field 'type' was always 1 and has always been ignored + _, msgid, error, res = unpacked + if error: + # ignore res, because it is only a fixed string anyway. + unpacked = {MSGID: msgid, b'exception_class': error} else: - self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))) - if not self.to_send and self.preload_ids: - chunk_id = self.preload_ids.pop(0) - args = {'id': chunk_id} - self.msgid += 1 - self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - if self.dictFormat: - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args})) - else: - self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args)))) + unpacked = {MSGID: msgid, RESULT: res} + else: + raise UnexpectedRPCDataFormatFromServer(data) + if msgid in self.ignore_responses: + self.ignore_responses.remove(msgid) + # async methods never return values, but may raise exceptions. + if b'exception_class' in unpacked: + self.async_responses[msgid] = unpacked + else: + # we currently do not have async result values except "None", + # so we do not add them into async_responses. + if unpacked[RESULT] is not None: + self.async_responses[msgid] = unpacked + else: + self.responses[msgid] = unpacked + elif fd is self.stderr_fd: + data = os.read(fd, 32768) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + # deal with incomplete lines (may appear due to block buffering) + if self.stderr_received: + data = self.stderr_received + data + self.stderr_received = b'' + lines = data.splitlines(keepends=True) + if lines and not lines[-1].endswith((b'\r', b'\n')): + self.stderr_received = lines.pop() + # now we have complete lines in and any partial line in self.stderr_received. + for line in lines: + handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences + if w: + while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT: + if calls: + if is_preloaded: + assert cmd == 'get', "is_preload is only supported for 'get'" + if calls[0]['id'] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(calls.pop(0)['id'])) + else: + args = calls.pop(0) + if cmd == 'get' and args['id'] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(args['id'])) + else: + self.msgid += 1 + waiting_for.append(self.msgid) + if self.dictFormat: + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) + else: + self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))) + if not self.to_send and self.preload_ids: + chunk_id = self.preload_ids.pop(0) + args = {'id': chunk_id} + self.msgid += 1 + self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) + if self.dictFormat: + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args})) + else: + self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args)))) - send_buffer() - self.ignore_responses |= set(waiting_for) # we lose order here + send_buffer() + finally: + self.ignore_responses |= set(waiting_for) # we lose order here + if is_preloaded: + for call in calls: + chunkid = call['id'] + if chunkid in self.chunkid_to_msgids: + self.ignore_responses.add(pop_preload_msgid(chunkid)) @api(since=parse_version('1.0.0'), append_only={'since': parse_version('1.0.7'), 'previously': False},