# trace.py - tracing of the parsing process (for debugging) and
# interrupting long running parser processes.
#
# Copyright 2018 by Eckhart Arnold (arnold@badw.de)
# Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""
Module ``trace`` provides trace-debugging functionality for the
parser. The tracers are added or removed via monkey patching to
all or some parsers of a grammar and trace the actions
of these parsers, making use of the `call_stack__`, `history__`
and `moving_forward__`, `most_recent_error__`-hooks in the
Grammar-object.
This functionality can be used for several purposes:
1. "live" or "breakpoint"-debugging (not implemented)
2. recording of parsing history and "post-mortem"-debugging,
implemented here and in module :py:mod:`log`
3. Interrupting long-running parser processes by polling
a threading.Event or multiprocessing.Event once in a while
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Tuple, Optional, List, Iterable, Union, cast
try:
import cython
except ImportError:
import DHParser.externallibs.shadow_cython as cython
from DHParser.error import Error, RESUME_NOTICE, RECURSION_DEPTH_LIMIT_HIT
from DHParser.nodetree import Node, REGEXP_PTYPE, TOKEN_PTYPE, WHITESPACE_PTYPE
from DHParser.log import HistoryRecord, CallItem, NONE_NODE
from DHParser.parse import Grammar, Parser, ParserError, ParseFunc, ContextSensitive, \
UnaryParser, SmartRE, cancel_proxy, Forward, ParsingResult, SEED
from DHParser.toolkit import line_col, INFINITE
__all__ = ('trace_history', 'set_tracer', 'resume_notices_on', 'resume_notices_off')
#######################################################################
#
# History-Recording for post-mortem debugging
#
#######################################################################
def symbol_name(parser: Parser, grammar: Grammar) -> str:
name = str(parser) if isinstance(parser, ContextSensitive) else parser.node_name
# name = parser.name
if name[:1] == ':':
name = parser.symbol + '->' + name
# name = grammar.associated_symbol__(parser).pname + '->' + name
return name
def result_changed(node, history) -> bool:
current_result = node is None
if history:
last_result = history[-1].node is NONE_NODE
else:
return False
return last_result != current_result
def call_item(parser: Parser, location: int, prefix: str = '') -> Tuple[str, int]:
if parser.node_name in (REGEXP_PTYPE, TOKEN_PTYPE, ":Retrieve", ":Pop",
":SmartRE", ":SmartRE_Lookahead"):
return f"{' ' or prefix}{parser.repr}", location # ' ' added to avoid ':' as first char!
else:
name = parser.pname or parser.node_name
if isinstance(parser, SmartRE) and parser.is_lookahead():
name += ":SmartRE_Lookahead"
return f"{prefix}{name}", location
def history_record(parser: Parser, grammar: Grammar,
node: Union[Node, None],
location: int,
location_: int,
prefix: str = '') -> HistoryRecord:
doc = grammar.document__
hnd = Node(node.name, doc[location:location_]).with_pos(location) if node else None
lc = line_col(grammar.document_lbreaks__, location)
errors = grammar.tree__.error_nodes.get(id(node), [])
if parser.node_name == ":Forward" and grammar.call_stack__:
grammar.call_stack__[-1] = (f"{prefix}{parser.parser.pname or str(parser)}", location)
elif parser.node_name[0:1] == ':' \
and not (isinstance(parser, SmartRE) and parser.is_lookahead()):
item = (f"{prefix}{parser}" + ' ' * int(bool(parser.pname)) , location)
if grammar.call_stack__:
grammar.call_stack__[-1] = item
else:
grammar.call_stack__.append(item)
return HistoryRecord(grammar.call_stack__, hnd, doc[location_:], lc, errors)
def wrap_call_up(self: Parser, node: Optional[Node], location: int, location_: int,
force: bool = False):
"""Creates a history record after the parser call has been completed.
"""
# Don't track returning parsers except in case an error has occurred!
grammar = self._grammar
if ((self.node_name != WHITESPACE_PTYPE)
and (grammar.moving_forward__
or ((not self.disposable or force)
and (node and grammar.history__[-1].node))
or result_changed(node, grammar.history__))):
# record history
# TODO: Make dropping insignificant whitespace from history configurable
record = history_record(self, grammar, node, location, location_)
cs_len = len(record.call_stack)
if (not grammar.history__ or not node
or record.line_col != grammar.history__[-1].line_col
or record.call_stack != grammar.history__[-1].call_stack[:cs_len]
or self == grammar.start_parser__):
if len(record.call_stack) >= 2 and \
record.call_stack[-2][0] in (":Lookbehind", ":NegativeLookbehind"):
record.text = grammar.reversed__[len(grammar.document__) - location_:]
if not grammar.moving_forward__ \
and not any(tag in (':Lookahead', ":NegativeLookahead")
or tag.endswith(":SmartRE_Lookahead")
for tag, _ in grammar.history__[-1].call_stack):
pass
# grammar.history__.pop()
grammar.history__.append(record)
@cython.locals(location_=cython.int, delta=cython.int, cs_len=cython.int,
i=cython.int, L=cython.int)
def trace_history(self: Parser, location: cython.int) -> Tuple[Optional[Node], cython.int]:
grammar = self._grammar # type: Grammar
if not grammar.history_tracking__:
if location < 0:
if location <= -INFINITE: location = 0
return self.visited[-location]
try:
node, location = self._parse(location) # <===== call to the actual parser!
except ParserError as pe:
raise pe
return node, location
if location < 0: # a negative location signals a memo-hit. see parse.Parser.__call__() !!!
if location <= -INFINITE: location = 0
location = -location
grammar.call_stack__.append(call_item(self, location, "RECALL: "))
node, location_ = self.visited.get(location, (None, location))
record = history_record(self, grammar, node, location, location_, "RECALL: ")
grammar.history__.append(record)
grammar.call_stack__.pop()
return node, location_
mre: Optional[ParserError] = grammar.most_recent_error__
if mre is not None and location >= mre.error.pos:
# add resume notice (mind that skip notices are added by
# `parse.MandatoryElementsParser.mandatory_violation()`)
if mre.error.code == RECURSION_DEPTH_LIMIT_HIT:
return mre.node, location
grammar.most_recent_error__ = None
errors = [mre.error] # type: List[Error]
text_ = grammar.document__[mre.error.pos:]
orig_lc = line_col(grammar.document_lbreaks__, mre.error.pos)
orig_snippet= text_ if len(text_) <= 10 else text_[:7] + '...'
# orig_snippet = orig_rest if len(orig_rest) <= 10 else orig_rest[:7] + '...'
target_lc = line_col(grammar.document_lbreaks__, location)
target_text_ = grammar.document__[location:]
target_snippet = target_text_ if len(target_text_) <= 10 else target_text_[:7] + '...'
# target = text if len(text) <= 10 else text[:7] + '...'
if mre.first_throw:
# origin = callstack_as_str(mre.callstack_snapshot, depth=3)
# resumer = callstack_as_str(grammar.call_stack__, depth=3)
origin = symbol_name(mre.parser, grammar)
resumer = symbol_name(self, grammar)
notice = Error( # resume notice
'Resuming from {} at {}:{} {} with {} at {}:{} {}'
.format(origin, *orig_lc, repr(orig_snippet),
resumer, *target_lc, repr(target_snippet)),
location, RESUME_NOTICE)
else:
origin = symbol_name(self, grammar)
notice = Error( # skip notice
'Skipping from {}:{} {} within {} to {}:{} {}'
.format(*orig_lc, repr(orig_snippet), origin,
*target_lc, repr(target_snippet)),
location, RESUME_NOTICE)
if grammar.resume_notices__:
grammar.tree__.add_error(mre.node, notice)
errors.append(notice)
grammar.history__.append(HistoryRecord(
getattr(mre, 'callstack_snapshot', grammar.call_stack__), mre.node, text_,
line_col(grammar.document_lbreaks__, mre.error.pos), errors))
grammar.call_stack__.append(call_item(self, location))
stack_counter = 1
if self.node_name in (":Lookbehind", ":NegativeLookbehind"):
grammar.call_stack__.append((' ' + cast(UnaryParser, self).parser.repr, location))
stack_counter += 1
elif isinstance(self, SmartRE) and self.is_lookahead() \
and not grammar.call_stack__[-1][0].endswith(':SmartRE_Lookahead'):
grammar.call_stack__.append((' :SmartRE_Lookahead', location))
stack_counter += 1
grammar_suspend_memo_state = grammar.suspend_memoization__
grammar.moving_forward__ = True
try:
if grammar.cancel_query__ is not None:
node, location_ = cancel_proxy(self, location)
else:
#####################################################################################
node, location_ = self._parse(location) # <===== call to the actual parser!
#####################################################################################
except ParserError as pe:
if pe.first_throw:
pe.callstack_snapshot = grammar.call_stack__.copy()
grammar.most_recent_error__ = pe
if self == grammar.start_parser__ and grammar.most_recent_error__:
fe = grammar.most_recent_error__ # type: ParserError
lc = line_col(grammar.document_lbreaks__, fe.error.pos)
# TODO: get the call stack from when the error occurred, here
nd = fe.node
grammar.history__.append(
HistoryRecord(grammar.call_stack__, nd,
grammar.document__[fe.location + nd.strlen():],
lc, [fe.error]))
while stack_counter > 0:
grammar.call_stack__.pop()
stack_counter -= 1
raise pe
except KeyboardInterrupt as ctrlC:
lc = line_col(grammar.document_lbreaks__, location)
grammar.history__.append(
HistoryRecord(grammar.call_stack__, None,
grammar.document__[location:],
lc, [Error('KeyboardInterrupt (Ctrl-C)', location)]))
raise ctrlC
wrap_call_up(self, node, location, location_)
grammar.moving_forward__ = False
while stack_counter > 0:
grammar.call_stack__.pop()
stack_counter -= 1
return node, location_
@dataclass
class TracingData:
parser: Forward
grammar: Grammar
location: int
history_pointer: int
call_stack_pointer: int
call_stack_pointer0 : int
def fw_memo(self: Forward, location: int, result: ParsingResult):
grammar = self._grammar
node, location_ = (None, location) if result is None else result
if location in self.visited:
prefix = f"SAPLING: "
elif result is SEED:
prefix = f"SEED: "
node = None
location = location
else:
prefix = f"GROW: "
grammar.call_stack__.append(call_item(self, location, prefix))
record = history_record(self, grammar, node, location, location_, prefix)
grammar.history__.append(record)
grammar.call_stack__.pop()
def fw_init(fwp: 'Forward', location: int)-> TracingData:
grammar = fwp._grammar
history = grammar.history__
return TracingData(fwp,
grammar,
location,
len(history),
len(history[-1].call_stack) if history else 0,
len(grammar.call_stack__))
def fw_loop(locals: TracingData) -> None:
grammar = locals.grammar
if grammar.history__: # don't do anything in case tracer has not been set
grammar.call_stack__.extend(grammar.history__[-1].call_stack[locals.call_stack_pointer:])
locals.call_stack_pointer = len(grammar.call_stack__)
locals.history_pointer = len(grammar.history__)
else:
assert False, "Does this ever happen?"
def fw_done(locals: TracingData, result: ParsingResult) -> None:
node, location = result
if node is not None:
grammar = locals.grammar
grammar.history__ = grammar.history__[:locals.history_pointer]
grammar.call_stack__ = grammar.call_stack__[:locals.call_stack_pointer0]
wrap_call_up(locals.parser, node, locals.location, location, force=True)
#######################################################################
#
# Adding and removing tracers
#
#######################################################################
[docs]
def set_tracer(parsers: Union[Grammar, Parser, Iterable[Parser]], tracer: Optional[ParseFunc]):
"""Adds or removes a tracing function to (or from) a single parser, a set of
parsers or all parsers in a grammar.
:param parsers: the parsers or single parser or grammar-object containing
parsers where the ``tracer`` shall be added or removed.
:param tracer: a tracer function or ``None``. If ``None`` any existing
tracer will be removed. If not None, tracer must be a parsing function.
It is up to the tracer to call the original parsing function
(``self._parse()``).
"""
if isinstance(parsers, Grammar):
if tracer is None:
parsers.history_tracking__ = False
parsers.resume_notices__ = False
parsers = parsers.all_parsers__
elif isinstance(parsers, Parser):
parsers = [parsers]
if parsers:
pivot = next(iter(parsers))
# assert all(pivot._grammar == parser._grammar for parser in parsers)
if tracer is None:
pivot._grammar.history_tracking__ = False
pivot._grammar.resume_notices__ = False
else:
pivot._grammar.history_tracking__ = True
pivot._grammar.resume_notices__ = True
for parser in parsers:
parser.set_proxy(tracer)
if isinstance(parser, Forward):
parser.set_fwtracer(fw_memo, fw_init, fw_loop, fw_done)
[docs]
def resume_notices_on(grammar: Grammar):
"""Turns resume-notices as well as history tracking on!"""
# grammar.history_tracking__ = True
grammar.resume_notices__ = True
set_tracer(grammar, trace_history)
[docs]
def resume_notices_off(grammar: Grammar):
"""Turns resume-notices as well as history tracking off!"""
set_tracer(grammar, None)