File size: 4,047 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
from __future__ import annotations
from collections.abc import Generator
class StreamReader:
"""
Generator-based stream reader.
This class doesn't support concurrent calls to :meth:`read_line`,
:meth:`read_exact`, or :meth:`read_to_eof`. Make sure calls are
serialized.
"""
def __init__(self) -> None:
self.buffer = bytearray()
self.eof = False
def read_line(self, m: int) -> Generator[None, None, bytes]:
"""
Read a LF-terminated line from the stream.
This is a generator-based coroutine.
The return value includes the LF character.
Args:
m: Maximum number bytes to read; this is a security limit.
Raises:
EOFError: If the stream ends without a LF.
RuntimeError: If the stream ends in more than ``m`` bytes.
"""
n = 0 # number of bytes to read
p = 0 # number of bytes without a newline
while True:
n = self.buffer.find(b"\n", p) + 1
if n > 0:
break
p = len(self.buffer)
if p > m:
raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
if self.eof:
raise EOFError(f"stream ends after {p} bytes, before end of line")
yield
if n > m:
raise RuntimeError(f"read {n} bytes, expected no more than {m} bytes")
r = self.buffer[:n]
del self.buffer[:n]
return r
def read_exact(self, n: int) -> Generator[None, None, bytes]:
"""
Read a given number of bytes from the stream.
This is a generator-based coroutine.
Args:
n: How many bytes to read.
Raises:
EOFError: If the stream ends in less than ``n`` bytes.
"""
assert n >= 0
while len(self.buffer) < n:
if self.eof:
p = len(self.buffer)
raise EOFError(f"stream ends after {p} bytes, expected {n} bytes")
yield
r = self.buffer[:n]
del self.buffer[:n]
return r
def read_to_eof(self, m: int) -> Generator[None, None, bytes]:
"""
Read all bytes from the stream.
This is a generator-based coroutine.
Args:
m: Maximum number bytes to read; this is a security limit.
Raises:
RuntimeError: If the stream ends in more than ``m`` bytes.
"""
while not self.eof:
p = len(self.buffer)
if p > m:
raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
yield
r = self.buffer[:]
del self.buffer[:]
return r
def at_eof(self) -> Generator[None, None, bool]:
"""
Tell whether the stream has ended and all data was read.
This is a generator-based coroutine.
"""
while True:
if self.buffer:
return False
if self.eof:
return True
# When all data was read but the stream hasn't ended, we can't
# tell if until either feed_data() or feed_eof() is called.
yield
def feed_data(self, data: bytes) -> None:
"""
Write data to the stream.
:meth:`feed_data` cannot be called after :meth:`feed_eof`.
Args:
data: Data to write.
Raises:
EOFError: If the stream has ended.
"""
if self.eof:
raise EOFError("stream ended")
self.buffer += data
def feed_eof(self) -> None:
"""
End the stream.
:meth:`feed_eof` cannot be called more than once.
Raises:
EOFError: If the stream has ended.
"""
if self.eof:
raise EOFError("stream ended")
self.eof = True
def discard(self) -> None:
"""
Discard all buffered data, but don't end the stream.
"""
del self.buffer[:]
|