File size: 2,198 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union

from .handlers import reraise_exception
from .utils import obsolete

# Constants
default_cache_dir: str
default_cache_size: float
verbose_cache: int

# Functions
def islocal(url: str) -> bool: ...
def get_filetype(fname: str) -> str: ...
def magic_filetype(fname: str) -> str: ...
def check_tar_format(fname: str) -> bool: ...

@obsolete
def pipe_cleaner(spec: str) -> str: ...

def url_to_cache_name(url: str, ndir: int = 0) -> str: ...

# Classes
class LRUCleanup:
    cache_dir: Optional[str]
    cache_size: int
    keyfn: Callable[[str], float]
    verbose: bool
    interval: Optional[int]
    last_run: float

    def __init__(
        self,
        cache_dir: Optional[str] = None,
        cache_size: int = int(1e12),
        keyfn: Callable[[str], float] = os.path.getctime,
        verbose: bool = False,
        interval: Optional[int] = 30,
    ) -> None: ...

    def set_cache_dir(self, cache_dir: str) -> None: ...
    def cleanup(self) -> None: ...

def download(url: str, dest: str, chunk_size: int = 1024**2, verbose: bool = False) -> None: ...

class StreamingOpen:
    verbose: bool
    handler: Callable[[Exception], bool]

    def __init__(self, verbose: bool = False, handler: Callable[[Exception], bool] = reraise_exception) -> None: ...
    def __call__(self, urls: Iterable[Union[str, Dict[str, str]]]) -> Iterator[Dict[str, Any]]: ...

class FileCache:
    url_to_name: Callable[[str], str]
    validator: Callable[[str], bool]
    handler: Callable[[Exception], bool]
    cache_dir: str
    verbose: bool
    cleaner: Optional[LRUCleanup]

    def __init__(
        self,
        cache_dir: Optional[str] = None,
        *,
        url_to_name: Callable[[str], str] = url_to_cache_name,
        verbose: bool = False,
        validator: Callable[[str], bool] = check_tar_format,
        handler: Callable[[Exception], bool] = reraise_exception,
        cache_size: int = -1,
        cache_cleanup_interval: int = 30,
    ) -> None: ...

    def get_file(self, url: str) -> str: ...
    def __call__(self, urls: Iterable[Union[str, Dict[str, str]]]) -> Iterator[Dict[str, Any]]: ...