Merge pull request #7197 from ThomasWaldmann/valid-archive-names

text attributes: validate more strictly, fixes #2290
This commit is contained in:
TW 2022-12-15 19:24:17 +01:00 committed by GitHub
commit d0d21e7928
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 172 additions and 28 deletions

View file

@ -15,7 +15,7 @@ from ..archive import FilesystemObjectProcessors, MetadataCollector, ChunksProce
from ..cache import Cache
from ..constants import * # NOQA
from ..compress import CompressionSpec
from ..helpers import ChunkerParams
from ..helpers import comment_validator, ChunkerParams
from ..helpers import NameSpec, FilesCacheMode
from ..helpers import eval_escapes
from ..helpers import timestamp, archive_ts_now
@ -816,7 +816,12 @@ class CreateMixIn:
archive_group = subparser.add_argument_group("Archive options")
archive_group.add_argument(
"--comment", dest="comment", metavar="COMMENT", default="", help="add a comment text to the archive"
"--comment",
metavar="COMMENT",
dest="comment",
type=comment_validator,
default="",
help="add a comment text to the archive",
)
archive_group.add_argument(
"--timestamp",

View file

@ -114,8 +114,7 @@ class ListMixIn:
help="Format output as JSON Lines. "
"The form of ``--format`` is ignored, "
"but keys used in it are added to the JSON output. "
"Some keys are always present. Note: JSON can only represent text. "
'A "bpath" key is therefore not available.',
"Some keys are always present. Note: JSON can only represent text.",
)
subparser.add_argument("name", metavar="NAME", type=NameSpec, help="specify the archive name")
subparser.add_argument(

View file

@ -5,7 +5,7 @@ from ._common import build_matcher
from ..archive import ArchiveRecreater
from ..constants import * # NOQA
from ..compress import CompressionSpec
from ..helpers import archivename_validator, ChunkerParams
from ..helpers import archivename_validator, comment_validator, ChunkerParams
from ..helpers import timestamp
from ..manifest import Manifest
@ -161,7 +161,12 @@ class RecreateMixIn:
help="write checkpoint every SECONDS seconds (Default: 1800)",
)
archive_group.add_argument(
"--comment", dest="comment", metavar="COMMENT", default=None, help="add a comment text to the archive"
"--comment",
metavar="COMMENT",
dest="comment",
type=comment_validator,
default=None,
help="add a comment text to the archive",
)
archive_group.add_argument(
"--timestamp",

View file

@ -114,7 +114,6 @@ class RListMixIn:
help="Format output as JSON. "
"The form of ``--format`` is ignored, "
"but keys used in it are added to the JSON output. "
"Some keys are always present. Note: JSON can only represent text. "
'A "barchive" key is therefore not available.',
"Some keys are always present. Note: JSON can only represent text.",
)
define_archive_filters_group(subparser)

View file

@ -15,7 +15,7 @@ from ..helpers import dash_open
from ..helpers import msgpack
from ..helpers import create_filter_process
from ..helpers import ChunkIteratorFileWrapper
from ..helpers import ChunkerParams
from ..helpers import comment_validator, ChunkerParams
from ..helpers import NameSpec
from ..helpers import remove_surrogates
from ..helpers import timestamp, archive_ts_now
@ -491,7 +491,12 @@ class TarMixIn:
archive_group = subparser.add_argument_group("Archive options")
archive_group.add_argument(
"--comment", dest="comment", metavar="COMMENT", default="", help="add a comment text to the archive"
"--comment",
metavar="COMMENT",
dest="comment",
type=comment_validator,
default="",
help="add a comment text to the archive",
)
archive_group.add_argument(
"--timestamp",

View file

@ -5,7 +5,7 @@ from ..archive import Archive
from ..constants import * # NOQA
from ..crypto.key import uses_same_id_hash, uses_same_chunker_secret
from ..helpers import EXIT_SUCCESS, EXIT_ERROR, Error
from ..helpers import location_validator, Location
from ..helpers import location_validator, Location, archivename_validator, comment_validator
from ..helpers import format_file_size
from ..manifest import Manifest
@ -39,6 +39,32 @@ class TransferMixIn:
if not archive_names:
return EXIT_SUCCESS
an_errors = []
av = archivename_validator()
for archive_name in archive_names:
try:
av(archive_name)
except argparse.ArgumentTypeError as err:
an_errors.append(str(err))
if an_errors:
self.print_error("Invalid archive names detected, please rename them before transfer:")
for err_msg in an_errors:
self.print_error(err_msg)
return EXIT_ERROR
ac_errors = []
for archive_name in archive_names:
archive = Archive(other_manifest, archive_name)
try:
comment_validator(archive.metadata.get("comment", ""))
except argparse.ArgumentTypeError as err:
ac_errors.append((archive_name, str(err)))
if ac_errors:
self.print_error("Invalid archive comments detected, please fix them before transfer:")
for archive_name, err_msg in ac_errors:
self.print_error(f"{archive_name}: {err_msg}")
return EXIT_ERROR
from .. import upgrade as upgrade_mod
try:

View file

@ -26,7 +26,7 @@ from .parseformat import sizeof_fmt, sizeof_fmt_iec, sizeof_fmt_decimal
from .parseformat import format_line, replace_placeholders, PlaceholderError
from .parseformat import SortBySpec, NameSpec
from .parseformat import format_archive, parse_stringified_list, clean_lines
from .parseformat import Location, location_validator, archivename_validator
from .parseformat import Location, location_validator, archivename_validator, comment_validator, text_validator
from .parseformat import BaseFormatter, ArchiveFormatter, ItemFormatter, file_status
from .parseformat import swidth_slice, ellipsis_truncate
from .parseformat import BorgJsonEncoder, basic_json_data, json_print, json_dump, prepare_dump_dict

View file

@ -540,14 +540,59 @@ def location_validator(proto=None, other=False):
def archivename_validator():
def validator(text):
assert isinstance(text, str)
# we make sure that the archive name can be used as directory name (for borg mount)
text = replace_placeholders(text)
if "/" in text or "::" in text or not text:
raise argparse.ArgumentTypeError('Invalid archive name: "%s"' % text)
MAX_PATH = 260 # Windows default. Since Win10, there is a registry setting LongPathsEnabled to get more.
MAX_DIRNAME = MAX_PATH - len("12345678.123")
SAFETY_MARGIN = 48 # borgfs path: mountpoint / archivename / dir / dir / ... / file
MAX_ARCHIVENAME = MAX_DIRNAME - SAFETY_MARGIN
if not (0 < len(text) <= MAX_ARCHIVENAME):
raise argparse.ArgumentTypeError(f'Invalid archive name: "{text}" [0 < length <= {MAX_ARCHIVENAME}]')
# note: ":" is also a invalid path char on windows, but we can not blacklist it,
# because e.g. our {now} placeholder creates ISO-8601 like output like 2022-12-10T20:47:42 .
invalid_chars = r"/" + r"\"<|>?*" # posix + windows
if re.search(f"[{re.escape(invalid_chars)}]", text):
raise argparse.ArgumentTypeError(
f'Invalid archive name: "{text}" [invalid chars detected matching "{invalid_chars}"]'
)
invalid_ctrl_chars = "".join(chr(i) for i in range(32))
if re.search(f"[{re.escape(invalid_ctrl_chars)}]", text):
raise argparse.ArgumentTypeError(
f'Invalid archive name: "{text}" [invalid control chars detected, ASCII < 32]'
)
if text.startswith(" ") or text.endswith(" "):
raise argparse.ArgumentTypeError(f'Invalid archive name: "{text}" [leading or trailing blanks]')
try:
text.encode("utf-8", errors="strict")
except UnicodeEncodeError:
# looks like text contains surrogate-escapes
raise argparse.ArgumentTypeError(f'Invalid archive name: "{text}" [contains non-unicode characters]')
return text
return validator
def text_validator(*, name, max_length, invalid_ctrl_chars="\0"):
def validator(text):
assert isinstance(text, str)
if not (len(text) <= max_length):
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [length <= {max_length}]')
if re.search(f"[{re.escape(invalid_ctrl_chars)}]", text):
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [invalid control chars detected]')
try:
text.encode("utf-8", errors="strict")
except UnicodeEncodeError:
# looks like text contains surrogate-escapes
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [contains non-unicode characters]')
return text
return validator
comment_validator = text_validator(name="comment", max_length=10000)
class BaseFormatter:
FIXED_KEYS = {
# Formatting aids
@ -571,7 +616,7 @@ class BaseFormatter:
return (
"- NEWLINE: OS dependent line separator\n"
"- NL: alias of NEWLINE\n"
"- NUL: NUL character for creating print0 / xargs -0 like output, see barchive and bpath keys below\n"
"- NUL: NUL character for creating print0 / xargs -0 like output\n"
"- SPACE\n"
"- TAB\n"
"- CR\n"
@ -581,11 +626,9 @@ class BaseFormatter:
class ArchiveFormatter(BaseFormatter):
KEY_DESCRIPTIONS = {
"archive": "archive name interpreted as text (might be missing non-text characters, see barchive)",
"archive": "archive name",
"name": 'alias of "archive"',
"barchive": "verbatim archive name, can contain any character except NUL",
"comment": "archive comment interpreted as text (might be missing non-text characters, see bcomment)",
"bcomment": "verbatim archive comment, can contain any character except NUL",
"comment": "archive comment",
# *start* is the key used by borg-info for this timestamp, this makes the formats more compatible
"start": "time (start) of creation of the archive",
"time": 'alias of "start"',
@ -596,7 +639,7 @@ class ArchiveFormatter(BaseFormatter):
"username": "username of user who created this archive",
}
KEY_GROUPS = (
("archive", "name", "barchive", "comment", "bcomment", "id"),
("archive", "name", "comment", "id"),
("start", "time", "end", "command_line"),
("hostname", "username"),
)
@ -647,7 +690,6 @@ class ArchiveFormatter(BaseFormatter):
"hostname": partial(self.get_meta, "hostname", rs=True),
"username": partial(self.get_meta, "username", rs=True),
"comment": partial(self.get_meta, "comment", rs=True),
"bcomment": partial(self.get_meta, "comment", rs=False),
"end": self.get_ts_end,
"command_line": self.get_cmdline,
}
@ -670,7 +712,6 @@ class ArchiveFormatter(BaseFormatter):
{
"name": remove_surrogates(archive_info.name),
"archive": remove_surrogates(archive_info.name),
"barchive": archive_info.name,
"id": bin_to_hex(archive_info.id),
"time": self.format_time(archive_info.ts),
"start": self.format_time(archive_info.ts),
@ -712,8 +753,7 @@ class ItemFormatter(BaseFormatter):
# shake_* is not provided because it uses an incompatible .digest() method to support variable length.
hash_algorithms = set(hashlib.algorithms_guaranteed).union({"xxh64"}).difference({"shake_128", "shake_256"})
KEY_DESCRIPTIONS = {
"bpath": "verbatim POSIX path, can contain any character except NUL",
"path": "path interpreted as text (might be missing non-text characters, see bpath)",
"path": "file path",
"source": "link target for symlinks (identical to linktarget)",
"hlid": "hard link identity (same if hardlinking same fs object)",
"extra": 'prepends {source} with " -> " for soft links and " link to " for hard links',
@ -724,7 +764,7 @@ class ItemFormatter(BaseFormatter):
"health": 'either "healthy" (file ok) or "broken" (if file has all-zero replacement chunks)',
}
KEY_GROUPS = (
("type", "mode", "uid", "gid", "user", "group", "path", "bpath", "source", "linktarget", "hlid", "flags"),
("type", "mode", "uid", "gid", "user", "group", "path", "source", "linktarget", "hlid", "flags"),
("size", "dsize", "num_chunks", "unique_chunks"),
("mtime", "ctime", "atime", "isomtime", "isoctime", "isoatime"),
tuple(sorted(hash_algorithms)),
@ -828,7 +868,6 @@ class ItemFormatter(BaseFormatter):
if self.json_lines:
item_data["healthy"] = "chunks_healthy" not in item
else:
item_data["bpath"] = item.path
item_data["extra"] = extra
item_data["health"] = "broken" if "chunks_healthy" in item else "healthy"
item_data["source"] = source

View file

@ -35,8 +35,6 @@ class ArchiverTestCase(ArchiverTestCaseBase):
self.assertEqual(output_1, output_2)
output_1 = self.cmd(f"--repo={self.repository_location}", "rlist", "--short")
self.assertEqual(output_1, "test-1\ntest-2\n")
output_1 = self.cmd(f"--repo={self.repository_location}", "rlist", "--format", "{barchive}/")
self.assertEqual(output_1, "test-1/test-2/")
output_3 = self.cmd(f"--repo={self.repository_location}", "rlist", "--format", "{name} {comment}{NL}")
self.assert_in("test-1 comment 1\n", output_3)
self.assert_in("test-2 comment 2\n", output_3)

View file

@ -81,6 +81,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
for got_archive, expected_archive in zip(got["archives"], expected["archives"]):
del got_archive["id"]
del expected_archive["id"]
del expected_archive["barchive"]
# timestamps:
# borg 1.2 transformed to local time and had microseconds = 0, no tzoffset
# borg 2 uses an utc timestamp, with microseconds and with tzoffset

View file

@ -32,6 +32,7 @@ from ..helpers import msgpack
from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
from ..helpers import StableDict, bin_to_hex
from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams
from ..helpers import archivename_validator, text_validator
from ..helpers import ProgressIndicatorPercent, ProgressIndicatorEndless
from ..helpers import swidth_slice
from ..helpers import chunkit
@ -246,6 +247,72 @@ class TestLocationWithoutEnv:
Location("ssh://user@host:/path")
@pytest.mark.parametrize(
"name",
[
"foobar",
# placeholders
"foobar-{now}",
],
)
def test_archivename_ok(name):
av = archivename_validator()
av(name) # must not raise an exception
@pytest.mark.parametrize(
"name",
[
"", # too short
"x" * 201, # too long
# invalid chars:
"foo/bar",
"foo\\bar",
">foo",
"<foo",
"|foo",
'foo"bar',
"foo?",
"*bar",
"foo\nbar",
"foo\0bar",
# leading/trailing blanks
" foo",
"bar ",
# contains surrogate-escapes
"foo\udc80bar",
"foo\udcffbar",
],
)
def test_archivename_invalid(name):
av = archivename_validator()
with pytest.raises(ArgumentTypeError):
av(name)
@pytest.mark.parametrize("text", ["", "single line", "multi\nline\ncomment"])
def test_text_ok(text):
tv = text_validator(max_length=100, name="name")
tv(text) # must not raise an exception
@pytest.mark.parametrize(
"text",
[
"x" * 101, # too long
# invalid chars:
"foo\0bar",
# contains surrogate-escapes
"foo\udc80bar",
"foo\udcffbar",
],
)
def test_text_invalid(text):
tv = text_validator(max_length=100, name="name")
with pytest.raises(ArgumentTypeError):
tv(text)
class FormatTimedeltaTestCase(BaseTestCase):
def test(self):
t0 = datetime(2001, 1, 1, 10, 20, 3, 0)