|
from __future__ import annotations |
|
|
|
from copy import copy |
|
from typing import Any |
|
|
|
from tomlkit.exceptions import ParseError |
|
from tomlkit.exceptions import UnexpectedCharError |
|
from tomlkit.toml_char import TOMLChar |
|
|
|
|
|
class _State: |
|
def __init__( |
|
self, |
|
source: Source, |
|
save_marker: bool | None = False, |
|
restore: bool | None = False, |
|
) -> None: |
|
self._source = source |
|
self._save_marker = save_marker |
|
self.restore = restore |
|
|
|
def __enter__(self) -> _State: |
|
|
|
self._chars = copy(self._source._chars) |
|
self._idx = self._source._idx |
|
self._current = self._source._current |
|
self._marker = self._source._marker |
|
|
|
return self |
|
|
|
def __exit__(self, exception_type, exception_val, trace): |
|
|
|
if self.restore or exception_type: |
|
self._source._chars = self._chars |
|
self._source._idx = self._idx |
|
self._source._current = self._current |
|
if self._save_marker: |
|
self._source._marker = self._marker |
|
|
|
|
|
class _StateHandler: |
|
""" |
|
State preserver for the Parser. |
|
""" |
|
|
|
def __init__(self, source: Source) -> None: |
|
self._source = source |
|
self._states = [] |
|
|
|
def __call__(self, *args, **kwargs): |
|
return _State(self._source, *args, **kwargs) |
|
|
|
def __enter__(self) -> _State: |
|
state = self() |
|
self._states.append(state) |
|
return state.__enter__() |
|
|
|
def __exit__(self, exception_type, exception_val, trace): |
|
state = self._states.pop() |
|
return state.__exit__(exception_type, exception_val, trace) |
|
|
|
|
|
class Source(str): |
|
EOF = TOMLChar("\0") |
|
|
|
def __init__(self, _: str) -> None: |
|
super().__init__() |
|
|
|
|
|
self._chars = iter([(i, TOMLChar(c)) for i, c in enumerate(self)]) |
|
|
|
self._idx = 0 |
|
self._marker = 0 |
|
self._current = TOMLChar("") |
|
|
|
self._state = _StateHandler(self) |
|
|
|
self.inc() |
|
|
|
def reset(self): |
|
|
|
self.inc() |
|
|
|
|
|
self.mark() |
|
|
|
@property |
|
def state(self) -> _StateHandler: |
|
return self._state |
|
|
|
@property |
|
def idx(self) -> int: |
|
return self._idx |
|
|
|
@property |
|
def current(self) -> TOMLChar: |
|
return self._current |
|
|
|
@property |
|
def marker(self) -> int: |
|
return self._marker |
|
|
|
def extract(self) -> str: |
|
""" |
|
Extracts the value between marker and index |
|
""" |
|
return self[self._marker : self._idx] |
|
|
|
def inc(self, exception: type[ParseError] | None = None) -> bool: |
|
""" |
|
Increments the parser if the end of the input has not been reached. |
|
Returns whether or not it was able to advance. |
|
""" |
|
try: |
|
self._idx, self._current = next(self._chars) |
|
|
|
return True |
|
except StopIteration: |
|
self._idx = len(self) |
|
self._current = self.EOF |
|
if exception: |
|
raise self.parse_error(exception) from None |
|
|
|
return False |
|
|
|
def inc_n(self, n: int, exception: type[ParseError] | None = None) -> bool: |
|
""" |
|
Increments the parser by n characters |
|
if the end of the input has not been reached. |
|
""" |
|
return all(self.inc(exception=exception) for _ in range(n)) |
|
|
|
def consume(self, chars, min=0, max=-1): |
|
""" |
|
Consume chars until min/max is satisfied is valid. |
|
""" |
|
while self.current in chars and max != 0: |
|
min -= 1 |
|
max -= 1 |
|
if not self.inc(): |
|
break |
|
|
|
|
|
if min > 0: |
|
raise self.parse_error(UnexpectedCharError, self.current) |
|
|
|
def end(self) -> bool: |
|
""" |
|
Returns True if the parser has reached the end of the input. |
|
""" |
|
return self._current is self.EOF |
|
|
|
def mark(self) -> None: |
|
""" |
|
Sets the marker to the index's current position |
|
""" |
|
self._marker = self._idx |
|
|
|
def parse_error( |
|
self, |
|
exception: type[ParseError] = ParseError, |
|
*args: Any, |
|
**kwargs: Any, |
|
) -> ParseError: |
|
""" |
|
Creates a generic "parse error" at the current position. |
|
""" |
|
line, col = self._to_linecol() |
|
|
|
return exception(line, col, *args, **kwargs) |
|
|
|
def _to_linecol(self) -> tuple[int, int]: |
|
cur = 0 |
|
for i, line in enumerate(self.splitlines()): |
|
if cur + len(line) + 1 > self.idx: |
|
return (i + 1, self.idx - cur) |
|
|
|
cur += len(line) + 1 |
|
|
|
return len(self.splitlines()), 0 |
|
|