|
import re |
|
import threading |
|
from typing import Any |
|
|
|
from antlr4 import CommonTokenStream, InputStream, ParserRuleContext |
|
from antlr4.error.ErrorListener import ErrorListener |
|
|
|
from .errors import GrammarParseError |
|
|
|
|
|
|
|
from .grammar_visitor import ( |
|
OmegaConfGrammarLexer, |
|
OmegaConfGrammarParser, |
|
) |
|
|
|
|
|
|
|
_grammar_cache = threading.local() |
|
|
|
|
|
|
|
_config_key = r"[$\w]+" |
|
_key_maybe_brackets = f"{_config_key}|\\[{_config_key}\\]" |
|
_node_access = f"\\.{_key_maybe_brackets}" |
|
_node_path = f"(\\.)*({_key_maybe_brackets})({_node_access})*" |
|
_node_inter = f"\\${{\\s*{_node_path}\\s*}}" |
|
_id = "[a-zA-Z_][\\w\\-]*" |
|
_resolver_name = f"({_id}(\\.{_id})*)?" |
|
_arg = r"[a-zA-Z_0-9/\-\+.$%*@?|]+" |
|
_args = f"{_arg}(\\s*,\\s*{_arg})*" |
|
_resolver_inter = f"\\${{\\s*{_resolver_name}\\s*:\\s*{_args}?\\s*}}" |
|
_inter = f"({_node_inter}|{_resolver_inter})" |
|
_outer = "([^$]|\\$(?!{))+" |
|
SIMPLE_INTERPOLATION_PATTERN = re.compile( |
|
f"({_outer})?({_inter}({_outer})?)+$", flags=re.ASCII |
|
) |
|
|
|
|
|
|
|
|
|
|
|
class OmegaConfErrorListener(ErrorListener): |
|
def syntaxError( |
|
self, |
|
recognizer: Any, |
|
offending_symbol: Any, |
|
line: Any, |
|
column: Any, |
|
msg: Any, |
|
e: Any, |
|
) -> None: |
|
raise GrammarParseError(str(e) if msg is None else msg) from e |
|
|
|
def reportAmbiguity( |
|
self, |
|
recognizer: Any, |
|
dfa: Any, |
|
startIndex: Any, |
|
stopIndex: Any, |
|
exact: Any, |
|
ambigAlts: Any, |
|
configs: Any, |
|
) -> None: |
|
raise GrammarParseError("ANTLR error: Ambiguity") |
|
|
|
def reportAttemptingFullContext( |
|
self, |
|
recognizer: Any, |
|
dfa: Any, |
|
startIndex: Any, |
|
stopIndex: Any, |
|
conflictingAlts: Any, |
|
configs: Any, |
|
) -> None: |
|
|
|
|
|
|
|
|
|
raise GrammarParseError( |
|
"ANTLR error: Attempting Full Context" |
|
) |
|
|
|
def reportContextSensitivity( |
|
self, |
|
recognizer: Any, |
|
dfa: Any, |
|
startIndex: Any, |
|
stopIndex: Any, |
|
prediction: Any, |
|
configs: Any, |
|
) -> None: |
|
raise GrammarParseError("ANTLR error: ContextSensitivity") |
|
|
|
|
|
def parse( |
|
value: str, parser_rule: str = "configValue", lexer_mode: str = "DEFAULT_MODE" |
|
) -> ParserRuleContext: |
|
""" |
|
Parse interpolated string `value` (and return the parse tree). |
|
""" |
|
l_mode = getattr(OmegaConfGrammarLexer, lexer_mode) |
|
istream = InputStream(value) |
|
|
|
cached = getattr(_grammar_cache, "data", None) |
|
if cached is None: |
|
error_listener = OmegaConfErrorListener() |
|
lexer = OmegaConfGrammarLexer(istream) |
|
lexer.removeErrorListeners() |
|
lexer.addErrorListener(error_listener) |
|
lexer.mode(l_mode) |
|
token_stream = CommonTokenStream(lexer) |
|
parser = OmegaConfGrammarParser(token_stream) |
|
parser.removeErrorListeners() |
|
parser.addErrorListener(error_listener) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_grammar_cache.data = lexer, token_stream, parser |
|
|
|
else: |
|
lexer, token_stream, parser = cached |
|
|
|
lexer.inputStream = istream |
|
|
|
lexer.mode(l_mode) |
|
token_stream.setTokenSource(lexer) |
|
parser.reset() |
|
|
|
try: |
|
return getattr(parser, parser_rule)() |
|
except Exception as exc: |
|
if type(exc) is Exception and str(exc) == "Empty Stack": |
|
|
|
|
|
|
|
raise GrammarParseError("Empty Stack") |
|
else: |
|
raise |
|
|