# trace.py - tracing of the parsing process (for debugging) and
# interrupting long running parser processes.
#
# Copyright 2018 by Eckhart Arnold (arnold@badw.de)
# Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""
Module ``trace`` provides trace-debugging functionality for the
parser. The tracers are added or removed via monkey patching to
all or some parsers of a grammar and trace the actions
of these parsers, making use of the `call_stack__`, `history__`
and `moving_forward__`, `most_recent_error__`-hooks in the
Grammar-object.
This functionality can be used for several purposes:
1. "live" or "breakpoint"-debugging (not implemented)
2. recording of parsing history and "post-mortem"-debugging,
implemented here and in module :py:mod:`log`
3. Interrupting long running parser processes by polling
a threading.Event or multiprocessing.Event once in a while
"""
# TODO: Add Trace-Parser-class
from __future__ import annotations
from typing import Tuple, Optional, List, Iterable, Union, cast
try:
import cython
except ImportError:
import DHParser.externallibs.shadow_cython as cython
from DHParser.error import Error, RESUME_NOTICE, RECURSION_DEPTH_LIMIT_HIT
from DHParser.nodetree import Node, REGEXP_PTYPE, TOKEN_PTYPE, WHITESPACE_PTYPE, ZOMBIE_TAG
from DHParser.log import HistoryRecord, NONE_NODE
from DHParser.parse import Grammar, Parser, ParserError, ParseFunc, ContextSensitive, \
UnaryParser, ERR, PARSER_PLACEHOLDER
from DHParser.toolkit import line_col
__all__ = ('trace_history', 'set_tracer', 'resume_notices_on', 'resume_notices_off')
#######################################################################
#
# Adding and removing tracers
#
#######################################################################
[docs]
def set_tracer(parsers: Union[Grammar, Parser, Iterable[Parser]], tracer: Optional[ParseFunc]):
"""Adds or removes a tracing function to (or from) a single parser, a set of
parsers or all parsers in a grammar.
:param parsers: the parsers or single parser or grammar-object containing
parsers where the ``tracer`` shall be added or removed.
:param tracer: a tracer function or ``None``. If ``None`` any existing
tracer will be removed. If not None, tracer must be a parsing function.
It is up to the tracer to call the original parsing function
(``self._parse()``).
"""
if isinstance(parsers, Grammar):
if tracer is None:
parsers.history_tracking__ = False
parsers.resume_notices__ = False
parsers = parsers.all_parsers__
elif isinstance(parsers, Parser):
parsers = [parsers]
if parsers:
pivot = next(iter(parsers))
assert all(pivot._grammar == parser._grammar for parser in parsers)
if tracer is not None:
pivot._grammar.history_tracking__ = True
for parser in parsers:
if parser.ptype != ':Forward':
parser.set_proxy(tracer)
#######################################################################
#
# History-Recording for post-mortem debugging
#
#######################################################################
def symbol_name(parser: Parser, grammar: Grammar) -> str:
name = str(parser) if isinstance(parser, ContextSensitive) else parser.node_name
# name = parser.name
if name[:1] == ':':
name = grammar.associated_symbol__(parser).pname + '->' + name
return name
def result_changed(node, history) -> bool:
current_result = node is None
if history:
last_result = history[-1].node is NONE_NODE
else:
return False
return last_result != current_result
def call_item(parser: Parser, location: int, prefix: str = '') -> Tuple[str, int]:
if parser.node_name in (REGEXP_PTYPE, TOKEN_PTYPE, ":Retrieve", ":Pop"):
return f"{' ' or prefix}{parser.repr}", location # ' ' added to avoid ':' as first char!
else:
return f"{prefix}{parser.pname or parser.node_name}", location
def history_record(parser: Parser, grammar: Grammar,
node: Node,
location: int,
location_: int,
prefix: str = '') -> HistoryRecord:
doc = grammar.document__
hnd = Node(node.name, doc[location:location_]).with_pos(location) if node else None
lc = line_col(grammar.document_lbreaks__, location)
errors = grammar.tree__.error_nodes.get(id(node), [])
if parser.node_name[0:1] == ':':
if parser.pname:
grammar.call_stack__[-1] = (f"{prefix}{parser.pname}", location)
else:
grammar.call_stack__[-1] = (f"{prefix}{parser} ", location)
return HistoryRecord(grammar.call_stack__, hnd, doc[location_:], lc, errors)
@cython.locals(location_=cython.int, delta=cython.int, cs_len=cython.int,
i=cython.int, L=cython.int)
def trace_history(self: Parser, location: cython.int) -> Tuple[Optional[Node], cython.int]:
grammar = self._grammar # type: Grammar
if not grammar.history_tracking__:
if location < 0: return self.visited[-location]
try:
node, location = self._parse(location) # <===== call to the actual parser!
except ParserError as pe:
raise pe
return node, location
if location < 0: # a negative location signals a memo-hit
location = -location
grammar.call_stack__.append(call_item(self, location, "RECALL: "))
node, location_ = self.visited[location]
record = history_record(self, grammar, node, location, location_, "RECALL: ")
grammar.history__.append(record)
grammar.call_stack__.pop()
return node, location_
mre: Optional[ParserError] = grammar.most_recent_error__
if mre is not None and location >= mre.error.pos:
# add resume notice (mind that skip notices are added by
# `parse.MandatoryElementsParser.mandatory_violation()`
if mre.error.code == RECURSION_DEPTH_LIMIT_HIT:
return mre.node, location
grammar.most_recent_error__ = None
errors = [mre.error] # type: List[Error]
text_ = grammar.document__[mre.error.pos:]
orig_lc = line_col(grammar.document_lbreaks__, mre.error.pos)
orig_snippet= text_ if len(text_) <= 10 else text_[:7] + '...'
# orig_snippet = orig_rest if len(orig_rest) <= 10 else orig_rest[:7] + '...'
target_lc = line_col(grammar.document_lbreaks__, location)
target_text_ = grammar.document__[location:]
target_snippet = target_text_ if len(target_text_) <= 10 else target_text_[:7] + '...'
# target = text if len(text) <= 10 else text[:7] + '...'
if mre.first_throw:
# origin = callstack_as_str(mre.callstack_snapshot, depth=3)
# resumer = callstack_as_str(grammar.call_stack__, depth=3)
origin = symbol_name(mre.parser, grammar)
resumer = symbol_name(self, grammar)
notice = Error( # resume notice
'Resuming from {} at {}:{} {} with {} at {}:{} {}'
.format(origin, *orig_lc, repr(orig_snippet),
resumer, *target_lc, repr(target_snippet)),
location, RESUME_NOTICE)
else:
origin = symbol_name(self, grammar)
notice = Error( # skip notice
'Skipping from {}:{} {} within {} to {}:{} {}'
.format(*orig_lc, repr(orig_snippet), origin,
*target_lc, repr(target_snippet)),
location, RESUME_NOTICE)
if grammar.resume_notices__:
grammar.tree__.add_error(mre.node, notice)
errors.append(notice)
grammar.history__.append(HistoryRecord(
getattr(mre, 'callstack_snapshot', grammar.call_stack__), mre.node, text_,
line_col(grammar.document_lbreaks__, mre.error.pos), errors))
grammar.call_stack__.append(call_item(self, location))
stack_counter = 1
if self.node_name in (":Lookbehind", ":NegativeLookbehind"):
grammar.call_stack__.append((' ' + cast(UnaryParser, self).parser.repr, location))
stack_counter += 1
grammar.moving_forward__ = True
try:
#####################################################################################
node, location_ = self._parse(location) # <===== call to the actual parser!
#####################################################################################
except ParserError as pe:
if pe.first_throw:
pe.callstack_snapshot = grammar.call_stack__.copy()
grammar.most_recent_error__ = pe
if self == grammar.start_parser__ and grammar.most_recent_error__:
fe = grammar.most_recent_error__ # type: ParserError
lc = line_col(grammar.document_lbreaks__, fe.error.pos)
# TODO: get the call stack from when the error occurred, here
nd = fe.node
grammar.history__.append(
HistoryRecord(grammar.call_stack__, nd,
grammar.document__[fe.location + nd.strlen():],
lc, [fe.error]))
while stack_counter > 0:
grammar.call_stack__.pop()
stack_counter -= 1
raise pe
except KeyboardInterrupt as ctrlC:
lc = line_col(grammar.document_lbreaks__, location)
grammar.history__.append(
HistoryRecord(grammar.call_stack__, None,
grammar.document__[location:],
lc, [Error('KeyboardInterrupt (Ctrl-C)', location)]))
raise ctrlC
# Don't track returning parsers except in case an error has occurred!
if ((self.node_name != WHITESPACE_PTYPE)
and (grammar.moving_forward__
or (not self.disposable
and (node and grammar.history__[-1].node))
or result_changed(node, grammar.history__))):
# record history
# TODO: Make dropping insignificant whitespace from history configurable
record = history_record(self, grammar, node, location, location_)
cs_len = len(record.call_stack)
if (not grammar.history__ or not node
or record.line_col != grammar.history__[-1].line_col
or record.call_stack != grammar.history__[-1].call_stack[:cs_len]
or self == grammar.start_parser__):
if len(record.call_stack) >= 2 and \
record.call_stack[-2][0] in (":Lookbehind", ":NegativeLookbehind"):
record.text = grammar.reversed__[len(grammar.document__) - location_:]
if not grammar.moving_forward__ \
and not any(tag == ':Lookahead' \
for tag, _ in grammar.history__[-1].call_stack):
grammar.history__.pop()
grammar.history__.append(record)
# elif node and node.name == ZOMBIE_TAG:
# if id(node) in grammar.tree__.error_nodes:
# lc = line_col(grammar.document_lbreaks__, location)
# record = HistoryRecord(grammar.call_stack__, node, doc[location_:], lc,
# self.tree__.error_nodes[id(node)])
# grammar.history__.append(record)
grammar.moving_forward__ = False
while stack_counter > 0:
grammar.call_stack__.pop()
stack_counter -= 1
return node, location_
[docs]
def resume_notices_on(grammar: Grammar):
"""Turns resume-notices as well as history tracking on!"""
# grammar.history_tracking__ = True
grammar.resume_notices__ = True
set_tracer(grammar, trace_history)
[docs]
def resume_notices_off(grammar: Grammar):
"""Turns resume-notices as well as history tracking off!"""
set_tracer(grammar, None)
#######################################################################
#
# Interrupt-Polling
#
#######################################################################