|
|
|
|
|
|
|
|
|
import sys |
|
if sys.version_info[1] > 5: |
|
from typing import TextIO |
|
else: |
|
from typing.io import TextIO |
|
from antlr4.BufferedTokenStream import TokenStream |
|
from antlr4.CommonTokenFactory import TokenFactory |
|
from antlr4.error.ErrorStrategy import DefaultErrorStrategy |
|
from antlr4.InputStream import InputStream |
|
from antlr4.Recognizer import Recognizer |
|
from antlr4.RuleContext import RuleContext |
|
from antlr4.ParserRuleContext import ParserRuleContext |
|
from antlr4.Token import Token |
|
from antlr4.Lexer import Lexer |
|
from antlr4.atn.ATNDeserializer import ATNDeserializer |
|
from antlr4.atn.ATNDeserializationOptions import ATNDeserializationOptions |
|
from antlr4.error.Errors import UnsupportedOperationException, RecognitionException |
|
from antlr4.tree.ParseTreePatternMatcher import ParseTreePatternMatcher |
|
from antlr4.tree.Tree import ParseTreeListener, TerminalNode, ErrorNode |
|
|
|
class TraceListener(ParseTreeListener): |
|
__slots__ = '_parser' |
|
|
|
def __init__(self, parser): |
|
self._parser = parser |
|
|
|
def enterEveryRule(self, ctx): |
|
print("enter " + self._parser.ruleNames[ctx.getRuleIndex()] + ", LT(1)=" + self._parser._input.LT(1).text, file=self._parser._output) |
|
|
|
def visitTerminal(self, node): |
|
|
|
print("consume " + str(node.symbol) + " rule " + self._parser.ruleNames[self._parser._ctx.getRuleIndex()], file=self._parser._output) |
|
|
|
def visitErrorNode(self, node): |
|
pass |
|
|
|
|
|
def exitEveryRule(self, ctx): |
|
print("exit " + self._parser.ruleNames[ctx.getRuleIndex()] + ", LT(1)=" + self._parser._input.LT(1).text, file=self._parser._output) |
|
|
|
|
|
|
|
class Parser (Recognizer): |
|
__slots__ = ( |
|
'_input', '_output', '_errHandler', '_precedenceStack', '_ctx', |
|
'buildParseTrees', '_tracer', '_parseListeners', '_syntaxErrors' |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
bypassAltsAtnCache = dict() |
|
|
|
def __init__(self, input:TokenStream, output:TextIO = sys.stdout): |
|
super().__init__() |
|
|
|
self._input = None |
|
self._output = output |
|
|
|
|
|
self._errHandler = DefaultErrorStrategy() |
|
self._precedenceStack = list() |
|
self._precedenceStack.append(0) |
|
|
|
|
|
self._ctx = None |
|
|
|
|
|
self.buildParseTrees = True |
|
|
|
|
|
|
|
|
|
|
|
self._tracer = None |
|
|
|
|
|
self._parseListeners = None |
|
|
|
|
|
self._syntaxErrors = 0 |
|
self.setInputStream(input) |
|
|
|
|
|
def reset(self): |
|
if self._input is not None: |
|
self._input.seek(0) |
|
self._errHandler.reset(self) |
|
self._ctx = None |
|
self._syntaxErrors = 0 |
|
self.setTrace(False) |
|
self._precedenceStack = list() |
|
self._precedenceStack.append(0) |
|
if self._interp is not None: |
|
self._interp.reset() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match(self, ttype:int): |
|
t = self.getCurrentToken() |
|
if t.type==ttype: |
|
self._errHandler.reportMatch(self) |
|
self.consume() |
|
else: |
|
t = self._errHandler.recoverInline(self) |
|
if self.buildParseTrees and t.tokenIndex==-1: |
|
|
|
|
|
self._ctx.addErrorNode(t) |
|
return t |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def matchWildcard(self): |
|
t = self.getCurrentToken() |
|
if t.type > 0: |
|
self._errHandler.reportMatch(self) |
|
self.consume() |
|
else: |
|
t = self._errHandler.recoverInline(self) |
|
if self.buildParseTrees and t.tokenIndex == -1: |
|
|
|
|
|
self._ctx.addErrorNode(t) |
|
|
|
return t |
|
|
|
def getParseListeners(self): |
|
return list() if self._parseListeners is None else self._parseListeners |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def addParseListener(self, listener:ParseTreeListener): |
|
if listener is None: |
|
raise ReferenceError("listener") |
|
if self._parseListeners is None: |
|
self._parseListeners = [] |
|
self._parseListeners.append(listener) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def removeParseListener(self, listener:ParseTreeListener): |
|
if self._parseListeners is not None: |
|
self._parseListeners.remove(listener) |
|
if len(self._parseListeners)==0: |
|
self._parseListeners = None |
|
|
|
|
|
def removeParseListeners(self): |
|
self._parseListeners = None |
|
|
|
|
|
def triggerEnterRuleEvent(self): |
|
if self._parseListeners is not None: |
|
for listener in self._parseListeners: |
|
listener.enterEveryRule(self._ctx) |
|
self._ctx.enterRule(listener) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def triggerExitRuleEvent(self): |
|
if self._parseListeners is not None: |
|
|
|
for listener in reversed(self._parseListeners): |
|
self._ctx.exitRule(listener) |
|
listener.exitEveryRule(self._ctx) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getNumberOfSyntaxErrors(self): |
|
return self._syntaxErrors |
|
|
|
def getTokenFactory(self): |
|
return self._input.tokenSource._factory |
|
|
|
|
|
def setTokenFactory(self, factory:TokenFactory): |
|
self._input.tokenSource._factory = factory |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getATNWithBypassAlts(self): |
|
serializedAtn = self.getSerializedATN() |
|
if serializedAtn is None: |
|
raise UnsupportedOperationException("The current parser does not support an ATN with bypass alternatives.") |
|
result = self.bypassAltsAtnCache.get(serializedAtn, None) |
|
if result is None: |
|
deserializationOptions = ATNDeserializationOptions() |
|
deserializationOptions.generateRuleBypassTransitions = True |
|
result = ATNDeserializer(deserializationOptions).deserialize(serializedAtn) |
|
self.bypassAltsAtnCache[serializedAtn] = result |
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compileParseTreePattern(self, pattern:str, patternRuleIndex:int, lexer:Lexer = None): |
|
if lexer is None: |
|
if self.getTokenStream() is not None: |
|
tokenSource = self.getTokenStream().tokenSource |
|
if isinstance( tokenSource, Lexer ): |
|
lexer = tokenSource |
|
if lexer is None: |
|
raise UnsupportedOperationException("Parser can't discover a lexer to use") |
|
|
|
m = ParseTreePatternMatcher(lexer, self) |
|
return m.compile(pattern, patternRuleIndex) |
|
|
|
|
|
def getInputStream(self): |
|
return self.getTokenStream() |
|
|
|
def setInputStream(self, input:InputStream): |
|
self.setTokenStream(input) |
|
|
|
def getTokenStream(self): |
|
return self._input |
|
|
|
|
|
def setTokenStream(self, input:TokenStream): |
|
self._input = None |
|
self.reset() |
|
self._input = input |
|
|
|
|
|
|
|
|
|
def getCurrentToken(self): |
|
return self._input.LT(1) |
|
|
|
def notifyErrorListeners(self, msg:str, offendingToken:Token = None, e:RecognitionException = None): |
|
if offendingToken is None: |
|
offendingToken = self.getCurrentToken() |
|
self._syntaxErrors += 1 |
|
line = offendingToken.line |
|
column = offendingToken.column |
|
listener = self.getErrorListenerDispatch() |
|
listener.syntaxError(self, offendingToken, line, column, msg, e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def consume(self): |
|
o = self.getCurrentToken() |
|
if o.type != Token.EOF: |
|
self.getInputStream().consume() |
|
hasListener = self._parseListeners is not None and len(self._parseListeners)>0 |
|
if self.buildParseTrees or hasListener: |
|
if self._errHandler.inErrorRecoveryMode(self): |
|
node = self._ctx.addErrorNode(o) |
|
else: |
|
node = self._ctx.addTokenNode(o) |
|
if hasListener: |
|
for listener in self._parseListeners: |
|
if isinstance(node, ErrorNode): |
|
listener.visitErrorNode(node) |
|
elif isinstance(node, TerminalNode): |
|
listener.visitTerminal(node) |
|
return o |
|
|
|
def addContextToParseTree(self): |
|
|
|
if self._ctx.parentCtx is not None: |
|
self._ctx.parentCtx.addChild(self._ctx) |
|
|
|
|
|
|
|
|
|
def enterRule(self, localctx:ParserRuleContext , state:int , ruleIndex:int): |
|
self.state = state |
|
self._ctx = localctx |
|
self._ctx.start = self._input.LT(1) |
|
if self.buildParseTrees: |
|
self.addContextToParseTree() |
|
if self._parseListeners is not None: |
|
self.triggerEnterRuleEvent() |
|
|
|
def exitRule(self): |
|
self._ctx.stop = self._input.LT(-1) |
|
|
|
if self._parseListeners is not None: |
|
self.triggerExitRuleEvent() |
|
self.state = self._ctx.invokingState |
|
self._ctx = self._ctx.parentCtx |
|
|
|
def enterOuterAlt(self, localctx:ParserRuleContext, altNum:int): |
|
localctx.setAltNumber(altNum) |
|
|
|
|
|
if self.buildParseTrees and self._ctx != localctx: |
|
if self._ctx.parentCtx is not None: |
|
self._ctx.parentCtx.removeLastChild() |
|
self._ctx.parentCtx.addChild(localctx) |
|
self._ctx = localctx |
|
|
|
|
|
|
|
|
|
|
|
|
|
def getPrecedence(self): |
|
if len(self._precedenceStack)==0: |
|
return -1 |
|
else: |
|
return self._precedenceStack[-1] |
|
|
|
def enterRecursionRule(self, localctx:ParserRuleContext, state:int, ruleIndex:int, precedence:int): |
|
self.state = state |
|
self._precedenceStack.append(precedence) |
|
self._ctx = localctx |
|
self._ctx.start = self._input.LT(1) |
|
if self._parseListeners is not None: |
|
self.triggerEnterRuleEvent() |
|
|
|
|
|
|
|
|
|
def pushNewRecursionContext(self, localctx:ParserRuleContext, state:int, ruleIndex:int): |
|
previous = self._ctx |
|
previous.parentCtx = localctx |
|
previous.invokingState = state |
|
previous.stop = self._input.LT(-1) |
|
|
|
self._ctx = localctx |
|
self._ctx.start = previous.start |
|
if self.buildParseTrees: |
|
self._ctx.addChild(previous) |
|
|
|
if self._parseListeners is not None: |
|
self.triggerEnterRuleEvent() |
|
|
|
def unrollRecursionContexts(self, parentCtx:ParserRuleContext): |
|
self._precedenceStack.pop() |
|
self._ctx.stop = self._input.LT(-1) |
|
retCtx = self._ctx |
|
|
|
if self._parseListeners is not None: |
|
while self._ctx is not parentCtx: |
|
self.triggerExitRuleEvent() |
|
self._ctx = self._ctx.parentCtx |
|
else: |
|
self._ctx = parentCtx |
|
|
|
|
|
retCtx.parentCtx = parentCtx |
|
|
|
if self.buildParseTrees and parentCtx is not None: |
|
|
|
parentCtx.addChild(retCtx) |
|
|
|
def getInvokingContext(self, ruleIndex:int): |
|
ctx = self._ctx |
|
while ctx is not None: |
|
if ctx.getRuleIndex() == ruleIndex: |
|
return ctx |
|
ctx = ctx.parentCtx |
|
return None |
|
|
|
|
|
def precpred(self, localctx:RuleContext , precedence:int): |
|
return precedence >= self._precedenceStack[-1] |
|
|
|
def inContext(self, context:str): |
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def isExpectedToken(self, symbol:int): |
|
atn = self._interp.atn |
|
ctx = self._ctx |
|
s = atn.states[self.state] |
|
following = atn.nextTokens(s) |
|
if symbol in following: |
|
return True |
|
if not Token.EPSILON in following: |
|
return False |
|
|
|
while ctx is not None and ctx.invokingState>=0 and Token.EPSILON in following: |
|
invokingState = atn.states[ctx.invokingState] |
|
rt = invokingState.transitions[0] |
|
following = atn.nextTokens(rt.followState) |
|
if symbol in following: |
|
return True |
|
ctx = ctx.parentCtx |
|
|
|
if Token.EPSILON in following and symbol == Token.EOF: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getExpectedTokens(self): |
|
return self._interp.atn.getExpectedTokens(self.state, self._ctx) |
|
|
|
def getExpectedTokensWithinCurrentRule(self): |
|
atn = self._interp.atn |
|
s = atn.states[self.state] |
|
return atn.nextTokens(s) |
|
|
|
|
|
def getRuleIndex(self, ruleName:str): |
|
ruleIndex = self.getRuleIndexMap().get(ruleName, None) |
|
if ruleIndex is not None: |
|
return ruleIndex |
|
else: |
|
return -1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getRuleInvocationStack(self, p:RuleContext=None): |
|
if p is None: |
|
p = self._ctx |
|
stack = list() |
|
while p is not None: |
|
|
|
ruleIndex = p.getRuleIndex() |
|
if ruleIndex<0: |
|
stack.append("n/a") |
|
else: |
|
stack.append(self.ruleNames[ruleIndex]) |
|
p = p.parentCtx |
|
return stack |
|
|
|
|
|
def getDFAStrings(self): |
|
return [ str(dfa) for dfa in self._interp.decisionToDFA] |
|
|
|
|
|
def dumpDFA(self): |
|
seenOne = False |
|
for i in range(0, len(self._interp.decisionToDFA)): |
|
dfa = self._interp.decisionToDFA[i] |
|
if len(dfa.states)>0: |
|
if seenOne: |
|
print(file=self._output) |
|
print("Decision " + str(dfa.decision) + ":", file=self._output) |
|
print(dfa.toString(self.literalNames, self.symbolicNames), end='', file=self._output) |
|
seenOne = True |
|
|
|
|
|
def getSourceName(self): |
|
return self._input.sourceName |
|
|
|
|
|
|
|
|
|
def setTrace(self, trace:bool): |
|
if not trace: |
|
self.removeParseListener(self._tracer) |
|
self._tracer = None |
|
else: |
|
if self._tracer is not None: |
|
self.removeParseListener(self._tracer) |
|
self._tracer = TraceListener(self) |
|
self.addParseListener(self._tracer) |
|
|