File size: 2,198 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
import os
from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union
from .handlers import reraise_exception
from .utils import obsolete
# Constants
default_cache_dir: str
default_cache_size: float
verbose_cache: int
# Functions
def islocal(url: str) -> bool: ...
def get_filetype(fname: str) -> str: ...
def magic_filetype(fname: str) -> str: ...
def check_tar_format(fname: str) -> bool: ...
@obsolete
def pipe_cleaner(spec: str) -> str: ...
def url_to_cache_name(url: str, ndir: int = 0) -> str: ...
# Classes
class LRUCleanup:
cache_dir: Optional[str]
cache_size: int
keyfn: Callable[[str], float]
verbose: bool
interval: Optional[int]
last_run: float
def __init__(
self,
cache_dir: Optional[str] = None,
cache_size: int = int(1e12),
keyfn: Callable[[str], float] = os.path.getctime,
verbose: bool = False,
interval: Optional[int] = 30,
) -> None: ...
def set_cache_dir(self, cache_dir: str) -> None: ...
def cleanup(self) -> None: ...
def download(url: str, dest: str, chunk_size: int = 1024**2, verbose: bool = False) -> None: ...
class StreamingOpen:
verbose: bool
handler: Callable[[Exception], bool]
def __init__(self, verbose: bool = False, handler: Callable[[Exception], bool] = reraise_exception) -> None: ...
def __call__(self, urls: Iterable[Union[str, Dict[str, str]]]) -> Iterator[Dict[str, Any]]: ...
class FileCache:
url_to_name: Callable[[str], str]
validator: Callable[[str], bool]
handler: Callable[[Exception], bool]
cache_dir: str
verbose: bool
cleaner: Optional[LRUCleanup]
def __init__(
self,
cache_dir: Optional[str] = None,
*,
url_to_name: Callable[[str], str] = url_to_cache_name,
verbose: bool = False,
validator: Callable[[str], bool] = check_tar_format,
handler: Callable[[Exception], bool] = reraise_exception,
cache_size: int = -1,
cache_cleanup_interval: int = 30,
) -> None: ...
def get_file(self, url: str) -> str: ...
def __call__(self, urls: Iterable[Union[str, Dict[str, str]]]) -> Iterator[Dict[str, Any]]: ...
|