mirror of
https://github.com/borgbackup/borg.git
synced 2026-05-25 18:53:06 -04:00
Merge pull request #9588 from ThomasWaldmann/fix-remoterepo-1.4
remote repo fixes 1.4
This commit is contained in:
commit
fc39330afe
1 changed files with 115 additions and 108 deletions
|
|
@ -834,125 +834,132 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
waiting_for = []
|
||||
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
|
||||
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
|
||||
while wait or calls:
|
||||
if self.shutdown_time and time.monotonic() > self.shutdown_time:
|
||||
# we are shutting this RemoteRepository down already, make sure we do not waste
|
||||
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
|
||||
logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.',
|
||||
len(waiting_for), len(self.async_responses))
|
||||
return
|
||||
while waiting_for:
|
||||
try:
|
||||
unpacked = self.responses.pop(waiting_for[0])
|
||||
waiting_for.pop(0)
|
||||
if b'exception_class' in unpacked:
|
||||
handle_error(unpacked)
|
||||
else:
|
||||
yield unpacked[RESULT]
|
||||
if not waiting_for and not calls:
|
||||
return
|
||||
except KeyError:
|
||||
break
|
||||
if cmd == 'async_responses':
|
||||
while True:
|
||||
try:
|
||||
while wait or calls:
|
||||
if self.shutdown_time and time.monotonic() > self.shutdown_time:
|
||||
# we are shutting this RemoteRepository down already, make sure we do not waste
|
||||
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
|
||||
logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.',
|
||||
len(waiting_for), len(self.async_responses))
|
||||
return
|
||||
while waiting_for:
|
||||
try:
|
||||
msgid, unpacked = self.async_responses.popitem()
|
||||
except KeyError:
|
||||
# there is nothing left what we already have received
|
||||
if async_wait and self.ignore_responses:
|
||||
# but do not return if we shall wait and there is something left to wait for:
|
||||
break
|
||||
else:
|
||||
return
|
||||
else:
|
||||
unpacked = self.responses.pop(waiting_for[0])
|
||||
waiting_for.pop(0)
|
||||
if b'exception_class' in unpacked:
|
||||
handle_error(unpacked)
|
||||
else:
|
||||
yield unpacked[RESULT]
|
||||
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
|
||||
w_fds = [self.stdin_fd]
|
||||
else:
|
||||
w_fds = []
|
||||
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
|
||||
if x:
|
||||
raise Exception('FD exception occurred')
|
||||
for fd in r:
|
||||
if fd is self.stdout_fd:
|
||||
data = os.read(fd, BUFSIZE)
|
||||
if not data:
|
||||
raise ConnectionClosed()
|
||||
self.rx_bytes += len(data)
|
||||
self.unpacker.feed(data)
|
||||
for unpacked in self.unpacker:
|
||||
if isinstance(unpacked, dict):
|
||||
msgid = unpacked[MSGID]
|
||||
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
||||
# The first field 'type' was always 1 and has always been ignored
|
||||
_, msgid, error, res = unpacked
|
||||
if error:
|
||||
# ignore res, because it is only a fixed string anyway.
|
||||
unpacked = {MSGID: msgid, b'exception_class': error}
|
||||
if not waiting_for and not calls:
|
||||
return
|
||||
except KeyError:
|
||||
break
|
||||
if cmd == 'async_responses':
|
||||
while True:
|
||||
try:
|
||||
msgid, unpacked = self.async_responses.popitem()
|
||||
except KeyError:
|
||||
# there is nothing left what we already have received
|
||||
if async_wait and self.ignore_responses:
|
||||
# but do not return if we shall wait and there is something left to wait for:
|
||||
break
|
||||
else:
|
||||
unpacked = {MSGID: msgid, RESULT: res}
|
||||
return
|
||||
else:
|
||||
raise UnexpectedRPCDataFormatFromServer(data)
|
||||
if msgid in self.ignore_responses:
|
||||
self.ignore_responses.remove(msgid)
|
||||
# async methods never return values, but may raise exceptions.
|
||||
if b'exception_class' in unpacked:
|
||||
self.async_responses[msgid] = unpacked
|
||||
handle_error(unpacked)
|
||||
else:
|
||||
# we currently do not have async result values except "None",
|
||||
# so we do not add them into async_responses.
|
||||
if unpacked[RESULT] is not None:
|
||||
self.async_responses[msgid] = unpacked
|
||||
else:
|
||||
self.responses[msgid] = unpacked
|
||||
elif fd is self.stderr_fd:
|
||||
data = os.read(fd, 32768)
|
||||
if not data:
|
||||
raise ConnectionClosed()
|
||||
self.rx_bytes += len(data)
|
||||
# deal with incomplete lines (may appear due to block buffering)
|
||||
if self.stderr_received:
|
||||
data = self.stderr_received + data
|
||||
self.stderr_received = b''
|
||||
lines = data.splitlines(keepends=True)
|
||||
if lines and not lines[-1].endswith((b'\r', b'\n')):
|
||||
self.stderr_received = lines.pop()
|
||||
# now we have complete lines in <lines> and any partial line in self.stderr_received.
|
||||
for line in lines:
|
||||
handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences
|
||||
if w:
|
||||
while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
|
||||
if calls:
|
||||
if is_preloaded:
|
||||
assert cmd == 'get', "is_preload is only supported for 'get'"
|
||||
if calls[0]['id'] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(calls.pop(0)['id']))
|
||||
else:
|
||||
args = calls.pop(0)
|
||||
if cmd == 'get' and args['id'] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(args['id']))
|
||||
else:
|
||||
self.msgid += 1
|
||||
waiting_for.append(self.msgid)
|
||||
if self.dictFormat:
|
||||
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
|
||||
yield unpacked[RESULT]
|
||||
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
|
||||
w_fds = [self.stdin_fd]
|
||||
else:
|
||||
w_fds = []
|
||||
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
|
||||
if x:
|
||||
raise Exception('FD exception occurred')
|
||||
for fd in r:
|
||||
if fd is self.stdout_fd:
|
||||
data = os.read(fd, BUFSIZE)
|
||||
if not data:
|
||||
raise ConnectionClosed()
|
||||
self.rx_bytes += len(data)
|
||||
self.unpacker.feed(data)
|
||||
for unpacked in self.unpacker:
|
||||
if isinstance(unpacked, dict):
|
||||
msgid = unpacked[MSGID]
|
||||
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
||||
# The first field 'type' was always 1 and has always been ignored
|
||||
_, msgid, error, res = unpacked
|
||||
if error:
|
||||
# ignore res, because it is only a fixed string anyway.
|
||||
unpacked = {MSGID: msgid, b'exception_class': error}
|
||||
else:
|
||||
self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args))))
|
||||
if not self.to_send and self.preload_ids:
|
||||
chunk_id = self.preload_ids.pop(0)
|
||||
args = {'id': chunk_id}
|
||||
self.msgid += 1
|
||||
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
||||
if self.dictFormat:
|
||||
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args}))
|
||||
else:
|
||||
self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args))))
|
||||
unpacked = {MSGID: msgid, RESULT: res}
|
||||
else:
|
||||
raise UnexpectedRPCDataFormatFromServer(data)
|
||||
if msgid in self.ignore_responses:
|
||||
self.ignore_responses.remove(msgid)
|
||||
# async methods never return values, but may raise exceptions.
|
||||
if b'exception_class' in unpacked:
|
||||
self.async_responses[msgid] = unpacked
|
||||
else:
|
||||
# we currently do not have async result values except "None",
|
||||
# so we do not add them into async_responses.
|
||||
if unpacked[RESULT] is not None:
|
||||
self.async_responses[msgid] = unpacked
|
||||
else:
|
||||
self.responses[msgid] = unpacked
|
||||
elif fd is self.stderr_fd:
|
||||
data = os.read(fd, 32768)
|
||||
if not data:
|
||||
raise ConnectionClosed()
|
||||
self.rx_bytes += len(data)
|
||||
# deal with incomplete lines (may appear due to block buffering)
|
||||
if self.stderr_received:
|
||||
data = self.stderr_received + data
|
||||
self.stderr_received = b''
|
||||
lines = data.splitlines(keepends=True)
|
||||
if lines and not lines[-1].endswith((b'\r', b'\n')):
|
||||
self.stderr_received = lines.pop()
|
||||
# now we have complete lines in <lines> and any partial line in self.stderr_received.
|
||||
for line in lines:
|
||||
handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences
|
||||
if w:
|
||||
while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
|
||||
if calls:
|
||||
if is_preloaded:
|
||||
assert cmd == 'get', "is_preload is only supported for 'get'"
|
||||
if calls[0]['id'] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(calls.pop(0)['id']))
|
||||
else:
|
||||
args = calls.pop(0)
|
||||
if cmd == 'get' and args['id'] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(args['id']))
|
||||
else:
|
||||
self.msgid += 1
|
||||
waiting_for.append(self.msgid)
|
||||
if self.dictFormat:
|
||||
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
|
||||
else:
|
||||
self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args))))
|
||||
if not self.to_send and self.preload_ids:
|
||||
chunk_id = self.preload_ids.pop(0)
|
||||
args = {'id': chunk_id}
|
||||
self.msgid += 1
|
||||
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
||||
if self.dictFormat:
|
||||
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args}))
|
||||
else:
|
||||
self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args))))
|
||||
|
||||
send_buffer()
|
||||
self.ignore_responses |= set(waiting_for) # we lose order here
|
||||
send_buffer()
|
||||
finally:
|
||||
self.ignore_responses |= set(waiting_for) # we lose order here
|
||||
if is_preloaded:
|
||||
for call in calls:
|
||||
chunkid = call['id']
|
||||
if chunkid in self.chunkid_to_msgids:
|
||||
self.ignore_responses.add(pop_preload_msgid(chunkid))
|
||||
|
||||
@api(since=parse_version('1.0.0'),
|
||||
append_only={'since': parse_version('1.0.7'), 'previously': False},
|
||||
|
|
|
|||
Loading…
Reference in a new issue