File size: 2,241 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from subprocess import Popen
from typing import (
    IO,
    Any,
    BinaryIO,
    Callable,
    Dict,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)

# Global variables
info: Dict[str, Any]
gopen_schemes: Dict[str, Callable]

T = TypeVar('T')

class Pipe:
    ignore_errors: bool
    ignore_status: List[int]
    timeout: float
    args: Tuple[Tuple, Dict[str, Any]]
    proc: Popen
    stream: Union[IO[bytes], BinaryIO]
    status: Optional[int]
    handler: Optional[Callable]

    def __init__(
        self,
        *args: Any,
        mode: Optional[str] = None,
        timeout: float = 7200.0,
        ignore_errors: bool = False,
        ignore_status: List[int] = [],
        **kw: Any
    ) -> None: ...

    def __str__(self) -> str: ...
    def check_status(self) -> None: ...
    def wait_for_child(self) -> None: ...
    def read(self, *args: Any, **kw: Any) -> bytes: ...
    def write(self, *args: Any, **kw: Any) -> int: ...
    def readLine(self, *args: Any, **kw: Any) -> bytes: ...
    def close(self) -> None: ...
    def __enter__(self) -> 'Pipe': ...
    def __exit__(self, etype: Any, value: Any, traceback: Any) -> None: ...
    def __del__(self) -> None: ...

def set_options(
    obj: Any,
    timeout: Optional[float] = None,
    ignore_errors: Optional[bool] = None,
    ignore_status: Optional[List[int]] = None,
    handler: Optional[Callable] = None
) -> bool: ...

def gopen_file(url: str, mode: str = "rb", bufsize: int = 8192) -> IO[bytes]: ...
def gopen_pipe(url: str, mode: str = "rb", bufsize: int = 8192) -> Pipe: ...
def gopen_curl(url: str, mode: str = "rb", bufsize: int = 8192) -> Pipe: ...
def gopen_htgs(url: str, mode: str = "rb", bufsize: int = 8192) -> Pipe: ...
def gopen_hf(url: str, mode: str = "rb", bufsize: int = 8192) -> Pipe: ...
def gopen_gsutil(url: str, mode: str = "rb", bufsize: int = 8192) -> Pipe: ...
def gopen_ais(url: str, mode: str = "rb", bufsize: int = 8192) -> Pipe: ...
def gopen_error(url: str, *args: Any, **kw: Any) -> None: ...
def rewrite_url(url: str) -> str: ...

def gopen(
    url: str,
    mode: str = "rb",
    bufsize: int = 8192,
    **kw: Any
) -> Union[BinaryIO, Pipe]: ...

def reader(url: str, **kw: Any) -> Union[BinaryIO, Pipe]: ...