sane remote logging, remote stderr, fixes #461

This commit is contained in:
Thomas Waldmann 2015-12-12 21:24:21 +01:00
parent 229512b6f5
commit 2e2e145372
3 changed files with 46 additions and 21 deletions

View file

@ -1191,7 +1191,7 @@ class Archiver:
def run(self, args):
os.umask(args.umask) # early, before opening files
self.lock_wait = args.lock_wait
setup_logging(level=args.log_level) # do not use loggers before this!
setup_logging(level=args.log_level, is_serve=args.func == self.do_serve) # do not use loggers before this!
check_extension_modules()
keys_dir = get_keys_dir()
if not os.path.exists(keys_dir):

View file

@ -52,7 +52,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None):
logger.warning(msg)
def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', level='info'):
def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', level='info', is_serve=False):
"""setup logging module according to the arguments provided
if conf_fname is given (or the config file name can be determined via
@ -60,6 +60,9 @@ def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', lev
otherwise, set up a stream handler logger on stderr (by default, if no
stream is provided).
if is_serve == True, we configure a special log format as expected by
the borg client log message interceptor.
"""
global configured
err_msg = None
@ -84,9 +87,11 @@ def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', lev
# if we did not / not successfully load a logging configuration, fallback to this:
logger = logging.getLogger('')
handler = logging.StreamHandler(stream)
# other formatters will probably want this, but let's remove clutter on stderr
# example:
# handler.setFormatter(logging.Formatter('%(name)s: %(message)s'))
if is_serve:
fmt = '$LOG %(levelname)s Remote: %(message)s'
else:
fmt = '%(message)s'
handler.setFormatter(logging.Formatter(fmt))
logger.addHandler(handler)
logger.setLevel(level.upper())
configured = True

View file

@ -63,12 +63,16 @@ class RepositoryServer: # pragma: no cover
def serve(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
# Make stdin non-blocking
fl = fcntl.fcntl(stdin_fd, fcntl.F_GETFL)
fcntl.fcntl(stdin_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# Make stdout blocking
fl = fcntl.fcntl(stdout_fd, fcntl.F_GETFL)
fcntl.fcntl(stdout_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
# Make stderr blocking
fl = fcntl.fcntl(stderr_fd, fcntl.F_GETFL)
fcntl.fcntl(stderr_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
unpacker = msgpack.Unpacker(use_list=False)
while True:
r, w, es = select.select([stdin_fd], [], [], 10)
@ -91,6 +95,7 @@ class RepositoryServer: # pragma: no cover
f = getattr(self.repository, method)
res = f(*args)
except BaseException as e:
# XXX rather log exception
exc = "Remote Traceback by Borg %s%s%s" % (__version__, os.linesep, traceback.format_exc())
os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
else:
@ -137,13 +142,15 @@ class RemoteRepository:
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE)
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
fcntl.fcntl(self.stdin_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdin_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
fcntl.fcntl(self.stdout_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdout_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
self.r_fds = [self.stdout_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd]
fcntl.fcntl(self.stderr_fd, fcntl.F_SETFL, fcntl.fcntl(self.stderr_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
self.r_fds = [self.stdout_fd, self.stderr_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
try:
version = self.call('negotiate', RPC_PROTOCOL_VERSION)
@ -238,19 +245,32 @@ class RemoteRepository:
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
if x:
raise Exception('FD exception occurred')
if r:
data = os.read(self.stdout_fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
raise Exception("Unexpected RPC data format.")
type, msgid, error, res = unpacked
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
else:
self.responses[msgid] = error, res
for fd in r:
if fd is self.stdout_fd:
data = os.read(fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
raise Exception("Unexpected RPC data format.")
type, msgid, error, res = unpacked
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
else:
self.responses[msgid] = error, res
elif fd is self.stderr_fd:
data = os.read(fd, 32768)
if not data:
raise ConnectionClosed()
data = data.decode('utf-8')
for line in data.splitlines():
if line.startswith('$LOG '):
_, level, msg = line.split(' ', 2)
level = getattr(logging, level, logging.CRITICAL) # str -> int
logging.log(level, msg)
else:
print("Remote: " + line, file=sys.stderr)
if w:
while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < 100:
if calls: