Merge pull request #6862 from ThomasWaldmann/mypy-borg2

add mypy
This commit is contained in:
TW 2022-07-15 15:23:07 +02:00 committed by GitHub
commit 65703df839
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 684 additions and 129 deletions

View file

@ -45,12 +45,15 @@ jobs:
pip install flake8
flake8 src scripts conftest.py
pytest:
tox:
needs: lint
strategy:
matrix:
include:
- os: ubuntu-20.04
python-version: '3.9'
toxenv: mypy
- os: ubuntu-20.04
python-version: '3.9'
toxenv: py39-fuse2
@ -117,7 +120,7 @@ jobs:
run: |
# pip install -e .
python setup.py -v develop
- name: run pytest via tox
- name: run tox env
run: |
# do not use fakeroot, but run as root. avoids the dreaded EISDIR sporadic failures. see #2482.
#sudo -E bash -c "tox -e py"

View file

@ -177,3 +177,23 @@ per_file_ignores =
max_line_length = 120
exclude = build,dist,.git,.idea,.cache,.tox
[mypy]
python_version = 3.9
strict_optional = False
local_partial_types = True
show_error_codes = True
files = src/borg/**/*.py
[mypy-msgpack.*]
ignore_missing_imports = True
[mypy-llfuse]
ignore_missing_imports = True
[mypy-pyfuse3]
ignore_missing_imports = True
[mypy-trio]
ignore_missing_imports = True
[mypy-borg.crypto.low_level]
ignore_missing_imports = True
[mypy-borg.platform.*]
ignore_missing_imports = True

View file

@ -4,7 +4,7 @@ from ._version import version as __version__
_v = parse_version(__version__)
__version_tuple__ = _v._version.release
__version_tuple__ = _v._version.release # type: ignore
# assert that all semver components are integers
# this is mainly to show errors when people repackage poorly

View file

@ -1109,7 +1109,7 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
def __exit__(self, exc_type, exc_val, exc_tb):
pass
files = None
files = None # type: ignore
cache_mode = "d"
def file_known_and_unchanged(self, hashed_path, path_hash, st):

8
src/borg/checksums.pyi Normal file
View file

@ -0,0 +1,8 @@
def crc32(data: bytes, value: int = 0) -> int: ...
def xxh64(data: bytes, seed: int = 0) -> bytes: ...
class StreamingXXH64:
def __init__(self, seed: int = 0) -> None: ...
def update(self, data: bytes) -> None: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...

28
src/borg/chunker.pyi Normal file
View file

@ -0,0 +1,28 @@
from typing import NamedTuple, Tuple, List, Dict, Any, Type, Iterator, BinaryIO
API_VERSION: str
has_seek_hole: bool
class _Chunk(NamedTuple):
data: bytes
meta: Dict[str, Any]
def Chunk(data: bytes, **meta) -> Type[_Chunk]: ...
def buzhash(data: bytes, seed: int) -> int: ...
def buzhash_update(sum: int, remove: int, add: int, len: int, seed: int) -> int: ...
def get_chunker(algo: str, *params, **kw) -> Any: ...
fmap_entry = Tuple[int, int, bool]
def sparsemap(fd: BinaryIO = None, fh: int = -1) -> List[fmap_entry]: ...
class ChunkerFixed:
def __init__(self, block_size: int, header_size: int = 0, sparse: bool = False) -> None: ...
def chunkify(self, fd: BinaryIO = None, fh: int = -1, fmap: List[fmap_entry] = None) -> Iterator: ...
class Chunker:
def __init__(
self, seed: int, chunk_min_exp: int, chunk_max_exp: int, hash_mask_bits: int, hash_window_size: int
) -> None: ...
def chunkify(self, fd: BinaryIO = None, fh: int = -1) -> Iterator: ...

63
src/borg/compress.pyi Normal file
View file

@ -0,0 +1,63 @@
from typing import Any, Type
API_VERSION: str
def get_compressor(name: str, **kwargs) -> Any: ...
class CompressionSpec:
def __init__(self, spec: str) -> None: ...
@property
def compressor(self) -> Any: ...
inner: CompressionSpec
class Compressor:
def __init__(self, name: Any = ..., **kwargs) -> None: ...
def compress(self, data: bytes) -> bytes: ...
def decompress(self, data: bytes) -> bytes: ...
@staticmethod
def detect(data: bytes) -> Any: ...
class CompressorBase:
ID: bytes = ...
name: str = ...
@classmethod
def detect(self, data: bytes) -> bool: ...
def __init__(self, level: int = ..., **kwargs) -> None: ...
def decide(self, data: bytes) -> Any: ...
def compress(self, data: bytes) -> bytes: ...
def decompress(self, data: bytes) -> bytes: ...
class Auto(CompressorBase):
def __init__(self, compressor: Any) -> None: ...
class DecidingCompressor(CompressorBase):
def __init__(self, level: int = ..., **kwargs) -> None: ...
def decide_compress(self, data: bytes) -> Any: ...
class CNONE(CompressorBase):
def __init__(self, level: int = ..., **kwargs) -> None: ...
class ObfuscateSize(CompressorBase):
def __init__(self, level: int = ..., compressor: Any = ...) -> None: ...
class ZLIB_legacy(CompressorBase):
def __init__(self, level: int = ..., **kwargs) -> None: ...
level: int
class ZLIB(CompressorBase):
def __init__(self, level: int = ..., **kwargs) -> None: ...
level: int
class LZ4(DecidingCompressor):
def __init__(self, level: int = ..., **kwargs) -> None: ...
class LZMA(DecidingCompressor):
def __init__(self, level: int = ..., **kwargs) -> None: ...
level: int
class ZSTD(DecidingCompressor):
def __init__(self, level: int = ..., **kwargs) -> None: ...
level: int
LZ4_COMPRESSOR: Type[LZ4]
NONE_COMPRESSOR: Type[CNONE]

View file

@ -3,6 +3,7 @@ import io
import json
import os
from hmac import compare_digest
from typing import Callable
from ..helpers import IntegrityError
from ..logger import create_logger
@ -54,8 +55,8 @@ class FileHashingWrapper(FileLikeWrapper):
are illegal.
"""
ALGORITHM = None
FACTORY = None
ALGORITHM: str = None
FACTORY: Callable = None
def __init__(self, backing_fd, write):
self.fd = backing_fd

View file

@ -3,7 +3,7 @@ import os
import textwrap
from binascii import a2b_base64, b2a_base64, hexlify
from hashlib import sha256, pbkdf2_hmac
from typing import Literal
from typing import Literal, Callable, Sequence
from ..logger import create_logger
@ -139,9 +139,9 @@ def uses_same_id_hash(other_key, key):
class KeyBase:
# Numeric key type ID, must fit in one byte.
TYPE = None # override in subclasses
TYPE: int = None # override in subclasses
# set of key type IDs the class can handle as input
TYPES_ACCEPTABLE = None # override in subclasses
TYPES_ACCEPTABLE: set[int] = None # override in subclasses
# Human-readable name
NAME = "UNDEFINED"
@ -153,8 +153,8 @@ class KeyBase:
STORAGE = KeyBlobStorage.NO_STORAGE
# Seed for the buzhash chunker (borg.algorithms.chunker.Chunker)
# type: int
chunk_seed = None
# type is int
chunk_seed: int = None
# Whether this *particular instance* is encrypted from a practical point of view,
# i.e. when it's using encryption with a empty passphrase, then
@ -356,7 +356,7 @@ class AESKeyBase(KeyBase):
PAYLOAD_OVERHEAD = 1 + 32 + 8 # TYPE + HMAC + NONCE
CIPHERSUITE = None # override in derived class
CIPHERSUITE: Callable = None # override in derived class
logically_encrypted = True
@ -839,7 +839,7 @@ class AEADKeyBase(KeyBase):
PAYLOAD_OVERHEAD = 1 + 1 + 6 + 24 + 16 # [bytes], see Layout
CIPHERSUITE = None # override in subclass
CIPHERSUITE: Callable = None # override in subclass
logically_encrypted = True

View file

@ -706,7 +706,7 @@ class FuseOperations(llfuse.Operations, FuseBackend):
# note: we can't have a generator (with yield) and not a generator (async) in the same method
if has_pyfuse3:
async def readdir(self, fh, off, token):
async def readdir(self, fh, off, token): # type: ignore[misc]
entries = [(b".", fh), (b"..", self.parent[fh])]
entries.extend(self.contents[fh].items())
for i, (name, inode) in enumerate(entries[off:], off):
@ -716,7 +716,7 @@ class FuseOperations(llfuse.Operations, FuseBackend):
else:
def readdir(self, fh, off):
def readdir(self, fh, off): # type: ignore[misc]
entries = [(b".", fh), (b"..", self.parent[fh])]
entries.extend(self.contents[fh].items())
for i, (name, inode) in enumerate(entries[off:], off):

88
src/borg/hashindex.pyi Normal file
View file

@ -0,0 +1,88 @@
from typing import NamedTuple, Tuple, Type, Union, IO, Iterator, Any
API_VERSION: str
PATH_OR_FILE = Union[str, IO]
def hashindex_variant(fn: str) -> str: ...
class IndexBase:
value_size: int
MAX_VALUE: int
MAX_LOAD_FACTOR: int
def __init__(
self, capacity: int = ..., path: PATH_OR_FILE = ..., permit_compact: bool = ..., usable: Union[int, float] = ...
): ...
@classmethod
def read(cls, path: PATH_OR_FILE, permit_compact: bool = False): ...
def write(self, path: PATH_OR_FILE) -> None: ...
def clear(self) -> None: ...
def setdefault(self, key: bytes, value: bytes) -> None: ...
def __delitem__(self, key: bytes) -> None: ...
def get(self, key: bytes, default: Any = ...) -> Any: ...
def pop(self, key: bytes, default: Any = ...) -> Any: ...
def __len__(self) -> int: ...
def size(self) -> int: ...
def compact(self) -> Any: ...
class ChunkIndexEntry(NamedTuple):
refcount: int
size: int
csize: int
CIE = Union[Tuple[int, int, int], Type[ChunkIndexEntry]]
class ChunkKeyIterator:
def __init__(self, keysize: int) -> None: ...
def __iter__(self) -> Iterator: ...
def __next__(self) -> Tuple[bytes, Type[ChunkIndexEntry]]: ...
class ChunkIndex(IndexBase):
def add(self, key: bytes, refs: int, size: int, csize: int) -> None: ...
def decref(self, key: bytes) -> CIE: ...
def incref(self, key: bytes) -> CIE: ...
def iteritems(self, marker: bytes = ...) -> Iterator: ...
def merge(self, other_index) -> None: ...
def stats_against(self, master_index) -> Tuple: ...
def summarize(self) -> Tuple: ...
def zero_csize_ids(self) -> int: ...
def __contains__(self, key: bytes) -> bool: ...
def __getitem__(self, key: bytes) -> Type[ChunkIndexEntry]: ...
def __setitem__(self, key: bytes, value: CIE) -> None: ...
class NSIndexEntry(NamedTuple):
segment: int
offset: int
size: int
class NSKeyIterator:
def __init__(self, keysize: int) -> None: ...
def __iter__(self) -> Iterator: ...
def __next__(self) -> Tuple[bytes, Type[Any]]: ...
class NSIndex(IndexBase):
def iteritems(self, *args, **kwargs) -> Iterator: ...
def __contains__(self, key: bytes) -> bool: ...
def __getitem__(self, key: bytes) -> Any: ...
def __setitem__(self, key: bytes, value: Any) -> None: ...
class NSIndex1(IndexBase): # legacy
def iteritems(self, *args, **kwargs) -> Iterator: ...
def __contains__(self, key: bytes) -> bool: ...
def __getitem__(self, key: bytes) -> Any: ...
def __setitem__(self, key: bytes, value: Any) -> None: ...
class FuseVersionsIndex(IndexBase):
def __contains__(self, key: bytes) -> bool: ...
def __getitem__(self, key: bytes) -> Any: ...
def __setitem__(self, key: bytes, value: Any) -> None: ...
class CacheSynchronizer:
csize_parts: int
csize_totals: int
num_files_parts: int
num_files_totals: int
size_parts: int
size_totals: int
def __init__(self, chunks_index: Any) -> None: ...
def feed(self, chunk: bytes) -> None: ...

View file

@ -5,18 +5,38 @@ that did not fit better elsewhere.
Code used to be in borg/helpers.py but was split into the modules in this
package, which are imported into here for compatibility.
"""
import os
from .checks import * # NOQA
from .datastruct import * # NOQA
from .errors import * # NOQA
from .fs import * # NOQA
from .manifest import * # NOQA
from .misc import * # NOQA
from .parseformat import * # NOQA
from .process import * # NOQA
from .progress import * # NOQA
from .time import * # NOQA
from .yes import * # NOQA
from ..constants import * # NOQA
from .checks import check_extension_modules, check_python
from .datastruct import StableDict, Buffer, EfficientCollectionQueue
from .errors import Error, ErrorWithTraceback, IntegrityError, DecompressionError
from .fs import ensure_dir, get_security_dir, get_keys_dir, get_base_dir, get_cache_dir, get_config_dir
from .fs import dir_is_tagged, dir_is_cachedir, make_path_safe, scandir_inorder
from .fs import secure_erase, safe_unlink, dash_open, os_open, os_stat, umount
from .fs import O_, flags_root, flags_dir, flags_special_follow, flags_special, flags_base, flags_normal, flags_noatime
from .fs import HardLinkManager
from .manifest import Manifest, NoManifestError, MandatoryFeatureUnsupported, AI_HUMAN_SORT_KEYS
from .misc import prune_within, prune_split, PRUNING_PATTERNS, sysinfo, log_multi, consume, get_tar_filter
from .misc import ChunkIteratorFileWrapper, open_item, chunkit, iter_separated, ErrorIgnoringTextIOWrapper
from .parseformat import bin_to_hex, safe_encode, safe_decode
from .parseformat import remove_surrogates, eval_escapes, decode_dict, positive_int_validator, interval
from .parseformat import ChunkerParams, FilesCacheMode, partial_format, DatetimeWrapper
from .parseformat import format_file_size, parse_file_size, FileSize, parse_storage_quota
from .parseformat import sizeof_fmt, sizeof_fmt_iec, sizeof_fmt_decimal
from .parseformat import format_line, replace_placeholders, PlaceholderError
from .parseformat import PrefixSpec, GlobSpec, CommentSpec, SortBySpec, NameSpec
from .parseformat import format_archive, parse_stringified_list, clean_lines
from .parseformat import Location, location_validator, archivename_validator
from .parseformat import BaseFormatter, ArchiveFormatter, ItemFormatter, file_status
from .parseformat import swidth_slice, ellipsis_truncate
from .parseformat import BorgJsonEncoder, basic_json_data, json_print, json_dump, prepare_dump_dict
from .process import daemonize, daemonizing, signal_handler, raising_signal_handler, sig_int, SigHup, SigTerm
from .process import popen_with_error_handling, is_terminal, prepare_subprocess_env, create_filter_process
from .progress import ProgressIndicatorPercent, ProgressIndicatorEndless, ProgressIndicatorMessage
from .time import parse_timestamp, timestamp, safe_timestamp, safe_s, safe_ns, MAX_S, SUPPORT_32BIT_PLATFORMS
from .time import format_time, format_timedelta, isoformat_time, to_localtime, OutputTimestamp
from .yes_no import yes, TRUISH, FALSISH, DEFAULTISH
from .msgpack import is_slow_msgpack, is_supported_msgpack, get_limited_unpacker
from . import msgpack

View file

@ -5,6 +5,7 @@ import re
from collections import abc, namedtuple
from datetime import datetime, timedelta
from operator import attrgetter
from typing import Sequence, FrozenSet
from .errors import Error
@ -158,9 +159,9 @@ class Manifest:
# count and the need to be able to find all (directly and indirectly) referenced chunks of a given archive.
DELETE = "delete"
NO_OPERATION_CHECK = tuple()
NO_OPERATION_CHECK: Sequence[Operation] = tuple()
SUPPORTED_REPO_FEATURES = frozenset([])
SUPPORTED_REPO_FEATURES: FrozenSet[str] = frozenset([])
MANIFEST_ID = b"\0" * 32

View file

@ -236,12 +236,12 @@ def iter_separated(fd, sep=None, read_size=4096):
sep = sep or ("\n" if is_str else b"\n")
while len(buf) > 0:
part2, *items = buf.split(sep)
*full, part = (part + part2, *items)
*full, part = (part + part2, *items) # type: ignore
yield from full
buf = fd.read(read_size)
# won't yield an empty part if stream ended with `sep`
# or if there was no data before EOF
if len(part) > 0:
if len(part) > 0: # type: ignore[arg-type]
yield part

View file

@ -694,7 +694,7 @@ class ArchiveFormatter(BaseFormatter):
class ItemFormatter(BaseFormatter):
# we provide the hash algos from python stdlib (except shake_*) and additionally xxh64.
# shake_* is not provided because it uses an incompatible .digest() method to support variable length.
hash_algorithms = hashlib.algorithms_guaranteed.union({"xxh64"}).difference({"shake_128", "shake_256"})
hash_algorithms = set(hashlib.algorithms_guaranteed).union({"xxh64"}).difference({"shake_128", "shake_256"})
KEY_DESCRIPTIONS = {
"bpath": "verbatim POSIX path, can contain any character except NUL",
"path": "path interpreted as text (might be missing non-text characters, see bpath)",

View file

@ -21,7 +21,7 @@ def justify_to_terminal_size(message):
class ProgressIndicatorBase:
LOGGER = "borg.output.progress"
JSON_TYPE = None
JSON_TYPE: str = None
json = False
operation_id_counter = 0

305
src/borg/item.pyi Normal file
View file

@ -0,0 +1,305 @@
from typing import FrozenSet, Set, NamedTuple, Tuple, Mapping, Dict, List, Iterator, Callable, Any
from .helpers import StableDict
API_VERSION: str
def want_bytes(v: Any, *, errors: str) -> bytes: ...
def chunks_contents_equal(chunks1: Iterator, chunks2: Iterator) -> bool: ...
class PropDict:
VALID_KEYS: Set[str] = ...
def __init__(self, data_dict: dict = None, internal_dict: dict = None, **kw) -> None: ...
def as_dict(self) -> StableDict: ...
def get(self, key: str, default: Any = None) -> Any: ...
def update(self, d: dict) -> None: ...
def update_internal(self, d: dict) -> None: ...
def __contains__(self, key: str) -> bool: ...
def __eq__(self, other: object) -> bool: ...
class ArchiveItem(PropDict):
@property
def version(self) -> int: ...
@version.setter
def version(self, val: int) -> None: ...
@property
def name(self) -> str: ...
@name.setter
def name(self, val: str) -> None: ...
@property
def time(self) -> str: ...
@time.setter
def time(self, val: str) -> None: ...
@property
def time_end(self) -> str: ...
@time_end.setter
def time_end(self, val: str) -> None: ...
@property
def username(self) -> str: ...
@username.setter
def username(self, val: str) -> None: ...
@property
def hostname(self) -> str: ...
@hostname.setter
def hostname(self, val: str) -> None: ...
@property
def comment(self) -> str: ...
@comment.setter
def comment(self, val: str) -> None: ...
@property
def chunker_params(self) -> Tuple: ...
@chunker_params.setter
def chunker_params(self, val: Tuple) -> None: ...
@property
def cmdline(self) -> List[str]: ...
@cmdline.setter
def cmdline(self, val: List[str]) -> None: ...
@property
def recreate_cmdline(self) -> List[str]: ...
@recreate_cmdline.setter
def recreate_cmdline(self, val: List[str]) -> None: ...
@property
def recreate_args(self) -> Any: ...
@recreate_args.setter
def recreate_args(self, val: Any) -> None: ...
@property
def recreate_partial_chunks(self) -> Any: ...
@recreate_partial_chunks.setter
def recreate_partial_chunks(self, val: Any) -> None: ...
@property
def recreate_source_id(self) -> Any: ...
@recreate_source_id.setter
def recreate_source_id(self, val: Any) -> None: ...
@property
def nfiles(self) -> int: ...
@nfiles.setter
def nfiles(self, val: int) -> None: ...
@property
def nfiles_parts(self) -> int: ...
@nfiles_parts.setter
def nfiles_parts(self, val: int) -> None: ...
@property
def size(self) -> int: ...
@size.setter
def size(self, val: int) -> None: ...
@property
def size_parts(self) -> int: ...
@size_parts.setter
def size_parts(self, val: int) -> None: ...
@property
def csize(self) -> int: ...
@csize.setter
def csize(self, val: int) -> None: ...
@property
def csize_parts(self) -> int: ...
@csize_parts.setter
def csize_parts(self, val: int) -> None: ...
@property
def items(self) -> List: ...
@items.setter
def items(self, val: List) -> None: ...
class ChunkListEntry(NamedTuple):
id: bytes
size: int
csize: int
class Item(PropDict):
@property
def path(self) -> str: ...
@path.setter
def path(self, val: str) -> None: ...
@property
def source(self) -> str: ...
@source.setter
def source(self, val: str) -> None: ...
def is_dir(self) -> bool: ...
def is_link(self) -> bool: ...
def _is_type(self, typetest: Callable) -> bool: ...
@classmethod
def create_deleted(self, path) -> Item: ...
@classmethod
def from_optr(self, optr: Any) -> Item: ...
def to_optr(self) -> Any: ...
@property
def atime(self) -> int: ...
@atime.setter
def atime(self, val: int) -> None: ...
@property
def ctime(self) -> int: ...
@ctime.setter
def ctime(self, val: int) -> None: ...
@property
def mtime(self) -> int: ...
@mtime.setter
def mtime(self, val: int) -> None: ...
@property
def birthtime(self) -> int: ...
@birthtime.setter
def birthtime(self, val: int) -> None: ...
@property
def xattrs(self) -> StableDict: ...
@xattrs.setter
def xattrs(self, val: StableDict) -> None: ...
@property
def acl_access(self) -> bytes: ...
@acl_access.setter
def acl_access(self, val: bytes) -> None: ...
@property
def acl_default(self) -> bytes: ...
@acl_default.setter
def acl_default(self, val: bytes) -> None: ...
@property
def acl_extended(self) -> bytes: ...
@acl_extended.setter
def acl_extended(self, val: bytes) -> None: ...
@property
def acl_nfs4(self) -> bytes: ...
@acl_nfs4.setter
def acl_nfs4(self, val: bytes) -> None: ...
@property
def bsdflags(self) -> int: ...
@bsdflags.setter
def bsdflags(self, val: int) -> None: ...
@property
def chunks(self) -> List: ...
@chunks.setter
def chunks(self, val: List) -> None: ...
@property
def chunks_healthy(self) -> List: ...
@chunks_healthy.setter
def chunks_healthy(self, val: List) -> None: ...
@property
def deleted(self) -> bool: ...
@deleted.setter
def deleted(self, val: bool) -> None: ...
@property
def hardlink_master(self) -> bool: ...
@hardlink_master.setter
def hardlink_master(self, val: bool) -> None: ...
@property
def uid(self) -> int: ...
@uid.setter
def uid(self, val: int) -> None: ...
@property
def gid(self) -> int: ...
@gid.setter
def gid(self, val: int) -> None: ...
@property
def user(self) -> str: ...
@user.setter
def user(self, val: str) -> None: ...
@property
def group(self) -> str: ...
@group.setter
def group(self, val: str) -> None: ...
@property
def mode(self) -> int: ...
@mode.setter
def mode(self, val: int) -> None: ...
@property
def rdev(self) -> int: ...
@rdev.setter
def rdev(self, val: int) -> None: ...
@property
def nlink(self) -> int: ...
@nlink.setter
def nlink(self, val: int) -> None: ...
@property
def size(self) -> int: ...
@size.setter
def size(self, val: int) -> None: ...
def get_size(
self,
hardlink_masters=...,
memorize: bool = ...,
compressed: bool = ...,
from_chunks: bool = ...,
consider_ids: List[bytes] = ...,
) -> int: ...
@property
def part(self) -> int: ...
@part.setter
def part(self, val: int) -> None: ...
class ManifestItem(PropDict):
@property
def version(self) -> int: ...
@version.setter
def version(self, val: int) -> None: ...
@property
def timestamp(self) -> str: ...
@timestamp.setter
def timestamp(self, val: str) -> None: ...
@property
def archives(self) -> Mapping[bytes, dict]: ...
@archives.setter
def archives(self, val: Mapping[bytes, dict]) -> None: ...
@property
def config(self) -> Dict: ...
@config.setter
def config(self, val: Dict) -> None: ...
@property
def item_keys(self) -> Tuple: ...
@item_keys.setter
def item_keys(self, val: Tuple) -> None: ...
class ItemDiff:
def __init__(self, *args, **kwargs) -> None: ...
def _chunk_content_equal(self, c1: Iterator, c2: Iterator) -> bool: ...
class Key(PropDict):
@property
def version(self) -> int: ...
@version.setter
def version(self, val: int) -> None: ...
@property
def chunk_seed(self) -> int: ...
@chunk_seed.setter
def chunk_seed(self, val: int) -> None: ...
@property
def tam_required(self) -> bool: ...
@tam_required.setter
def tam_required(self, val: bool) -> None: ...
@property
def enc_hmac_key(self) -> bytes: ...
@enc_hmac_key.setter
def enc_hmac_key(self, val: bytes) -> None: ...
@property
def enc_key(self) -> bytes: ...
@enc_key.setter
def enc_key(self, val: bytes) -> None: ...
@property
def id_key(self) -> bytes: ...
@id_key.setter
def id_key(self, val: bytes) -> None: ...
@property
def repository_id(self) -> bytes: ...
@repository_id.setter
def repository_id(self, val: bytes) -> None: ...
class EncryptedKey(PropDict):
@property
def version(self) -> int: ...
@version.setter
def version(self, val: int) -> None: ...
@property
def algorithm(self) -> str: ...
@algorithm.setter
def algorithm(self, val: str) -> None: ...
@property
def salt(self) -> bytes: ...
@salt.setter
def salt(self, val: bytes) -> None: ...
@property
def iterations(self) -> int: ...
@iterations.setter
def iterations(self, val: int) -> None: ...
@property
def data(self) -> bytes: ...
@data.setter
def data(self, val: bytes) -> None: ...
@property
def hash(self) -> bytes: ...
@hash.setter
def hash(self, val: bytes) -> None: ...

View file

@ -135,7 +135,64 @@ def find_parent_module():
return __name__
def create_logger(name=None):
class LazyLogger:
def __init__(self, name=None):
self.__name = name or find_parent_module()
self.__real_logger = None
@property
def __logger(self):
if self.__real_logger is None:
if not configured:
raise Exception("tried to call a logger before setup_logging() was called")
self.__real_logger = logging.getLogger(self.__name)
if self.__name.startswith("borg.debug.") and self.__real_logger.level == logging.NOTSET:
self.__real_logger.setLevel("WARNING")
return self.__real_logger
def getChild(self, suffix):
return LazyLogger(self.__name + "." + suffix)
def setLevel(self, *args, **kw):
return self.__logger.setLevel(*args, **kw)
def log(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.log(*args, **kw)
def exception(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.exception(*args, **kw)
def debug(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.debug(*args, **kw)
def info(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.info(*args, **kw)
def warning(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.warning(*args, **kw)
def error(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.error(*args, **kw)
def critical(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.critical(*args, **kw)
def create_logger(name: str = None) -> LazyLogger:
"""lazily create a Logger object with the proper path, which is returned by
find_parent_module() by default, or is provided via the commandline
@ -152,62 +209,6 @@ def create_logger(name=None):
If you try, you'll get an exception.
"""
class LazyLogger:
def __init__(self, name=None):
self.__name = name or find_parent_module()
self.__real_logger = None
@property
def __logger(self):
if self.__real_logger is None:
if not configured:
raise Exception("tried to call a logger before setup_logging() was called")
self.__real_logger = logging.getLogger(self.__name)
if self.__name.startswith("borg.debug.") and self.__real_logger.level == logging.NOTSET:
self.__real_logger.setLevel("WARNING")
return self.__real_logger
def getChild(self, suffix):
return LazyLogger(self.__name + "." + suffix)
def setLevel(self, *args, **kw):
return self.__logger.setLevel(*args, **kw)
def log(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.log(*args, **kw)
def exception(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.exception(*args, **kw)
def debug(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.debug(*args, **kw)
def info(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.info(*args, **kw)
def warning(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.warning(*args, **kw)
def error(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.error(*args, **kw)
def critical(self, *args, **kw):
if "msgid" in kw:
kw.setdefault("extra", {})["msgid"] = kw.pop("msgid")
return self.__logger.critical(*args, **kw)
return LazyLogger(name)

View file

@ -172,7 +172,7 @@ def normalize_path(path):
class PatternBase:
"""Shared logic for inclusion/exclusion patterns."""
PREFIX = NotImplemented
PREFIX: str = None
def __init__(self, pattern, recurse_dir=False):
self.pattern_orig = pattern

View file

@ -6,26 +6,9 @@ Public APIs are documented in platform.base.
from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin
from .base import listxattr, getxattr, setxattr, ENOATTR
from .base import acl_get, acl_set
from .base import set_flags, get_flags
from .base import SaveFile, SyncFile, sync_dir, fdatasync, safe_fadvise
from .base import swidth, API_VERSION
from .base import process_alive, get_process_id, local_pid_alive, fqdn, hostname, hostid
OS_API_VERSION = API_VERSION
if not is_win32:
from .posix import process_alive, local_pid_alive
# posix swidth implementation works for: linux, freebsd, darwin, openindiana, cygwin
from .posix import swidth
from .posix import get_errno
from .posix import uid2user, user2uid, gid2group, group2gid, getosusername
else:
from .windows import process_alive, local_pid_alive
from .windows import uid2user, user2uid, gid2group, group2gid, getosusername
from .base import ENOATTR, API_VERSION
from .base import SaveFile, sync_dir, fdatasync, safe_fadvise
from .base import get_process_id, fqdn, hostname, hostid
if is_linux: # pragma: linux only
from .linux import API_VERSION as OS_API_VERSION
@ -33,11 +16,48 @@ if is_linux: # pragma: linux only
from .linux import acl_get, acl_set
from .linux import set_flags, get_flags
from .linux import SyncFile
from .posix import process_alive, local_pid_alive
from .posix import swidth
from .posix import get_errno
from .posix import uid2user, user2uid, gid2group, group2gid, getosusername
elif is_freebsd: # pragma: freebsd only
from .freebsd import API_VERSION as OS_API_VERSION
from .freebsd import listxattr, getxattr, setxattr
from .freebsd import acl_get, acl_set
from .base import set_flags, get_flags
from .base import SyncFile
from .posix import process_alive, local_pid_alive
from .posix import swidth
from .posix import get_errno
from .posix import uid2user, user2uid, gid2group, group2gid, getosusername
elif is_darwin: # pragma: darwin only
from .darwin import API_VERSION as OS_API_VERSION
from .darwin import listxattr, getxattr, setxattr
from .darwin import acl_get, acl_set
from .base import set_flags, get_flags
from .base import SyncFile
from .posix import process_alive, local_pid_alive
from .posix import swidth
from .posix import get_errno
from .posix import uid2user, user2uid, gid2group, group2gid, getosusername
elif not is_win32: # pragma: posix only
# generic stuff for all other posix OSes
OS_API_VERSION = API_VERSION
from .base import listxattr, getxattr, setxattr
from .base import acl_get, acl_set
from .base import set_flags, get_flags
from .base import SyncFile
from .posix import process_alive, local_pid_alive
from .posix import swidth
from .posix import get_errno
from .posix import uid2user, user2uid, gid2group, group2gid, getosusername
else: # pragma: win32 only
# win32 specific stuff
OS_API_VERSION = API_VERSION
from .base import listxattr, getxattr, setxattr
from .base import acl_get, acl_set
from .base import set_flags, get_flags
from .base import SyncFile
from .windows import process_alive, local_pid_alive
from .base import swidth
from .windows import uid2user, user2uid, gid2group, group2gid, getosusername

View file

@ -82,7 +82,7 @@ def acl_set(path, item, numeric_ids=False, fd=None):
try:
from os import lchflags
from os import lchflags # type: ignore[attr-defined]
def set_flags(path, bsd_flags, fd=None):
lchflags(path, bsd_flags)
@ -323,15 +323,3 @@ def get_process_id():
thread_id = 0
pid = os.getpid()
return hostid, pid, thread_id
def process_alive(host, pid, thread):
"""
Check if the (host, pid, thread_id) combination corresponds to a potentially alive process.
"""
raise NotImplementedError
def local_pid_alive(pid):
"""Return whether *pid* is alive."""
raise NotImplementedError

View file

@ -495,7 +495,7 @@ def api(*, since, **kwargs_decorator):
class RemoteRepository:
extra_test_args = []
extra_test_args = [] # type: ignore
class RPCError(Exception):
def __init__(self, unpacked):

View file

@ -168,7 +168,7 @@ class BaseTestCase(unittest.TestCase):
if raises:
assert_raises = staticmethod(raises)
else:
assert_raises = unittest.TestCase.assertRaises
assert_raises = unittest.TestCase.assertRaises # type: ignore
@contextmanager
def assert_creates_file(self, path):

View file

@ -250,7 +250,7 @@ def test_disk_full(cmd):
class ArchiverTestCaseBase(BaseTestCase):
EXE = None # python source based
EXE: str = None # python source based
FORK_DEFAULT = False
prefix = ""

View file

@ -38,7 +38,7 @@ other::r--
"ascii"
)
_acls_working = None
# _acls_working = None
def fakeroot_detected():

View file

@ -113,9 +113,9 @@ class TestRepositoryCache:
# Force cache to back off
qsl = cache.query_size_limit
cache.query_size_limit = query_size_limit
cache.query_size_limit = query_size_limit # type: ignore[assignment]
cache.backoff()
cache.query_size_limit = qsl
cache.query_size_limit = qsl # type: ignore[assignment]
# Evicted H(1) and H(2)
assert cache.evictions == 2
assert H(1) not in cache.cache

View file

@ -29,3 +29,12 @@ changedir =
deps =
flake8
commands = flake8 src scripts conftest.py
[testenv:mypy]
changedir =
deps =
pytest
mypy
pkgconfig
types-python-dateutil
commands = mypy