Source code for pipeline

# pipeline.py - Processing-pipelines for consecutive stages of tree/data-transformations
#
# Copyright 2023  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.

"""
Module ``pipeline`` implements support for processing-pipelines for
connecting successive stages of tree transformations (called
"junctions") to processing pipelines. Processing pipelines have
one staring point, the source-document, but can have one or more
end points. For example, if the source is a text-document, the
end points can be an HTML document for the online-presentation
and a LaTeX-document to produce a printed version.

Each junction is a triple of the name of the source-stage,
the transformation-function and the name of the destination-stage.
Pipelines are defined by the set of junctions from which paths
connecting the source-point to the end-points are derived
algorithmically.
"""

from __future__ import annotations

import functools

from functools import partial
from typing import Set, Union, Any, Dict, List, Tuple, Iterable, Optional, Sequence, NamedTuple, \
    Callable

from DHParser.compile import compile_source, process_tree, CompilerFactory
from DHParser.configuration import get_config_value
from DHParser.error import Error, has_errors, is_fatal, FATAL, CANCELED
from DHParser.nodetree import RootNode, Node
from DHParser.parse import Grammar, ParserFactory, Parser
from DHParser.preprocess import PreprocessorFactory, PreprocessorFunc, Tokenizer, \
    gen_find_include_func, preprocess_includes, make_preprocessor, chain_preprocessors, \
    DeriveFileNameFunc, PreprocessorResult
from DHParser.toolkit import ThreadLocalSingletonFactory, deprecation_warning, deprecated, \
    get_annotations, CancelQuery
from DHParser.trace import resume_notices_on, set_tracer, trace_history
from DHParser.transform import TransformerFunc, TransformerFactory, transformer, TransformationDict

__all__ = ('Junction',
           'PipelineResult',
           'end_points',
           'connection',
           'as_paths',
           'PIPE_CHARS',
           'pp_paths',
           'PipeTree',
           'as_graph',
           'extract_data',
           'run_pipeline',
           'full_pipeline',
           'PseudoJunction',
           'create_preprocess_junction',
           'create_parser_junction',
           'create_compiler_junction',
           'create_evaluation_junction',
           'create_junction')


[docs] class Junction(NamedTuple): """A junction is a triple of the name of the source-stage, a factory-function that returns the actual transformation function and the name of the destination-stage.""" src: str factory: Union[ParserFactory, CompilerFactory, TransformerFactory] dst: str __module__ = __name__ # needed for cython compatibility
# compilation and postprocessing result: # Dict: target-stage-name -> (result, errors) PipelineResult = Dict[str, Tuple[Union[RootNode, Any], List[Error]]]
[docs] def end_points(junctions: Iterable[Junction]) -> Set[str]: """Returns all "final" destination stages, i.e. destinations that are not a source of another junction.""" try: sources = { j.src for j in junctions } except AttributeError: # accept old-style tripples as well junctions = [Junction(*t) for t in junctions] sources = { j.src for j in junctions } return {j.dst for j in junctions if j.dst not in sources}
[docs] def connection(junctions: Iterable[Junction], target: str, origin: str = "CST") -> List[Junction]: """Returns the junctions that connect the target with its origin. In case there is no such connection, a value error will be raised.""" result = [] junctions = set(junctions) while target != origin: for j in junctions: if j.dst == target: result.append(j) target = j.src break else: raise ValueError(f'No connection from {origin} to {target}!') junctions.remove(j) result.reverse() return result
[docs] def as_paths(junctions: Set[Junction]) -> Dict[str, List[str]]: """Returns a dictionary that maps each end point to the path from the start (usually "CST") to that end point.""" srcs = {j.src for j in junctions} dsts = {j.dst for j in junctions} endpoints = dsts - srcs start = (srcs - dsts).pop() dst_src = { j.dst: j.src for j in junctions } paths = { d: [d, dst_src[d]] for d in endpoints } repeat = True while repeat: repeat = False for endpoint, path in paths.items(): if path[-1] != start: path.append(dst_src[path[-1]]) repeat = True for path in paths.values(): path.reverse() return paths
PIPE_CHARS = ' ┊┆|│├─└┃┣┗━' # {' ', '│', '├', '└', '─'} def pp_paths(paths: Union[Dict[str, List[str]], Set[Junction]], vertical = '│ ', bifurcation = '├─') -> str: if isinstance(paths, (Set, frozenset)): paths = as_paths(paths) paths = list(paths.values()) paths.sort(reverse=True) paths.sort(key=len, reverse=True) l = [] for path in paths: k = 0 i = 0 while k < len(path) and i < len(l): if l[i][:1] in PIPE_CHARS or path[k] != l[i]: if i > 0 and l[i - 1][:1] not in PIPE_CHARS: l[i] = (bifurcation + l[i]) if l[i][:1] not in PIPE_CHARS else (vertical + l[i]) else: l[i] = vertical + l[i] i += 1 else: k += 1 i += 1 while k < len(path): l.append(path[k]) k += 1 return '\n'.join(l)
[docs] class PipeTree(NamedTuple): src: str desc: List[PipeTree] depth: int def __repr__(self): return f"PipeTree({self.src}, {self.desc}, {self.depth})" def __str__(self): l = [] for ch in self.desc: l.extend([" " + cl for cl in str(ch).split('\n')]) return '\n'.join([self.src, *l])
[docs] def as_graph(junctions: Iterable[Junction]) -> PipeTree: """Returns the junctions as a directed graph.""" # TODO: unit-test sometimes fails!!! jdict = dict() for j in junctions: if j.src not in jdict: jdict[j.src] = {j} else: jdict[j.src].add(j) srcs = {j.src for j in junctions} dsts = {j.dst for j in junctions} diff = srcs - dsts if len(diff) > 1: raise ValueError('More than one origin stage within the set of junctions: ' + str(diff)) elif len(diff) < 1: raise ValueError('No origin stage found in the set of junctions! (The ' 'connected junctions should form a tree, but there ' 'seems to be a cycle somewhere...)') src = diff.pop() def subtree(j: Junction) -> PipeTree: if j.dst not in jdict: return PipeTree(j.dst, [], 1) else: desc = [subtree(d) for d in jdict[j.dst]] desc.sort(key=lambda x: x.src) desc.sort(key=lambda x: x.depth) return PipeTree(j.dst, desc, max(child.depth for child in desc) + 1) return subtree(Junction(src, None, src))
[docs] def extract_data(tree_or_data: Union[RootNode, Node, Any]) -> Any: """Retrieves the data from the given tree or just passes the data through if the argument ``tree_or_data`` is not of type RootNode.""" if isinstance(tree_or_data, RootNode): return tree_or_data.data return tree_or_data
[docs] def run_pipeline(junctions: Set[Junction], source_stages: Dict[str, RootNode], target_stages: Set[str], *, cancel_query: Optional[CancelQuery] = None) -> PipelineResult: """ Runs all the intermediary compilation-steps that are necessary to produce the "target-stages" from the given "source-stages". Here, each source-stage consists of a name for that stage, say "AST", and a node-tree that represents the data at this stage of the processing pipeline. In the target-stage, the data can be a node-tree or data of any other kind. The stages or connected through chains of junctions, where a junction is essentially a function that transforms a tree from one particular stage (identified by its name) to another stage, again identified by its name. TODO: Parallelize processing of junctions? Requires copying a lot ot tree-data!? """ import copy def cmp_junctions(a, b) -> int: if a[-1] == b[0]: return -1 if b[-1] == a[0]: return 1 if b[-1] > a[-1]: return -1 else: return 0 def verify_stage(given_stage, junction, field, further_info=''): if not given_stage: return # verification is considered as turned off assert field in (0, 2) expected_stage = junction[field] stage_type = 'source stage' if field == 0 else 'target stage' if given_stage.lower() != expected_stage.lower(): if isinstance(junction[1], ThreadLocalSingletonFactory): func = junction[1].class_or_factory.__name__ else: func = junction[1].__name__ error_msg = (f'Expected {stage_type} "{expected_stage}" but found ' f'"{given_stage}" when applying {junction[0]}->{junction[2]} ({func})! ' 'Possible causes: a) wrong stage name specified in junction b) stage name not ' f'updated by compilation-function c) internal error of DHParser. {further_info}') import traceback stack = traceback.extract_stack() for call in stack: if call.line and call.line.find('run_grammar_tests') >= 0: if call.line.find('get_') >= 0: # Be kind and backwards-compatible with old code deprecation_warning(f'Your transformation {junction[0]}->{junction[2]}: ' f"{func} failed to update RootNode.stage " f'with the name of the target stage: "{junction[2]}"! ' f'Future versions of DHParser might fail right here.') print(error_msg) break else: raise AssertionError(error_msg) def normalize_name(name: str) -> str: NAME = name.upper() return NAME if NAME in ('AST', 'CST') else name def normalize_junction(j: Junction): SRC = j[0].upper() if SRC == 'CST': return Junction('CST', j[1], normalize_name(j[2])) elif SRC == 'AST' and SRC != j[0]: return Junction('AST', j[1], j[2]) else: return j t_to_j = {normalize_name(j[-1]): normalize_junction(j) for j in junctions} target_stages = {normalize_name(t) for t in target_stages} source_stages = {normalize_name(s): source_stages[s] for s in source_stages} steps = [] targets = target_stages.copy() already_reached = targets | source_stages.keys() while targets: try: j_sequence = [t_to_j[t] for t in targets if t not in source_stages] j_sequence.sort(key=functools.cmp_to_key(cmp_junctions)) steps.append(j_sequence) except KeyError as e: raise AssertionError(f"{e.args[0]} is not a valid target.") targets = {j[0] for j in steps[-1] if j[0] not in already_reached} already_reached |= targets for step in steps[:-1]: for j in steps[-1]: try: step.remove(j) except ValueError: pass if not (target_stages <= already_reached): raise ValueError(f'Target-stages: {target_stages - already_reached} ' f'cannot be reached with junctions: {junctions}.') sources = [j[0] for step in steps for j in step] disposables = {s for s in set(sources) if s not in target_stages and sources.count(s) <= 1} steps.reverse() results: Dict[str, Any] = source_stages.copy() errata: Dict[str, List[Error]] = {s: source_stages[s].errors_sorted for s in source_stages} cancel = False for step in steps: for junction in step: t = junction[-1] if t not in results: s = junction[0] tree = results[s] if s in disposables else copy.deepcopy(results[s]) if s not in target_stages: sources.remove(s) if sources.count(s) <= 1: disposables.add(s) if tree is None: results[t] = None errata[t] = errata[s] else: if not isinstance(tree, RootNode): raise ValueError(f'Object in stage "{s}" is not a tree (RootNode) ' f'but a {type(tree)} ' f'and, therefore, cannot be processed to {t}') verify_stage(tree.stage, junction, 0) transformation = junction[1]() if hasattr(transformation, 'cancel_query'): transformation.cancel_query = cancel_query elif hasattr(transformation, 'cancel_query__'): transformation.cancel_query__ = cancel_query results[t] = process_tree(transformation, tree) # TODO: pass cancel query, here errata[t] = copy.copy(tree.errors_sorted) if cancel_query is not None and cancel_query(): tree.new_error(tree, "Pipeline-processing canceled!", CANCELED) if is_fatal(tree.error_flag): cancel = True break if tree.stage == s: # tree stage hasn't been set by the processing function tree.stage = junction[2] else: verify_stage(tree.stage, junction, 2) if cancel: break return {t: (extract_data(results[t]), errata[t]) for t in results.keys()}
[docs] def full_pipeline(source: str, preprocessor_factory: PreprocessorFactory, parser_factory: ParserFactory, junctions: Set[Junction], target_stages: Set[str], start_parser: Union[str, Parser] = "root_parser__", *, cancel_query: Optional[CancelQuery] = None) -> PipelineResult: """Runs a processing pipeline starting from the source-code (in contrast to "run_pipeline()" which starts from any tree-stage, typically, from the concrete syntax-tree (CST). "full_pipeline()" preprocesses and compiles the source-document, first. And then it post-processes the source into the given target stages. Mind that if there are fatal errors earlier in the pipeline, some or all target stages might not be reached and thus not be included in the result. """ cst, msgs, _ = compile_source(source, preprocessor_factory(), parser_factory(), start_parser=start_parser, cancel_query=cancel_query) if has_errors(msgs, FATAL): return {ts: (cst, msgs) for ts in target_stages} return run_pipeline(junctions, {cst.stage: cst}, target_stages, cancel_query=cancel_query)
####################################################################### # # Helper-functions for creating junctions # ####################################################################### # In the following PARTIAL FUNCTIONS are used rather than local functions, # because the closure of local functions cannot be pickled! # Preprocessing-Stage ################################################# # def _preprocess(source, factory) -> Union[str, StringView]: # return factory()(source)
[docs] class PseudoJunction(NamedTuple): factory: Union[PreprocessorFactory, ParserFactory] __module__ = __name__ # needed for cython compatibility
def _preprocessor_factory(prep_func: Union[PreprocessorFunc, Tokenizer], include_regex, comment_regex, derive_file_name, func_type: Optional[type]=None) -> PreprocessorFunc: # below, the second parameter must always be the same as Grammar.COMMENT__! find_next_include = gen_find_include_func(include_regex, comment_regex, derive_file_name) include_prep = partial(preprocess_includes, find_next_include=find_next_include) anno = get_annotations(prep_func) try: ret_type = anno['return'] if ret_type is PreprocessorResult or ret_type == PreprocessorResult or \ str(ret_type) == 'PreprocessorResult': assert func_type is None or func_type is PreprocessorFunc \ or func_type == PreprocessorFunc or str(func_type) == "PreprocessorFunc", \ f"func_type={func_type} is incompatible with return type PreprocessorResult of " \ f"parameter prep_func when calling DHParser.pipeline.create_preprocess_junction()" prep = prep_func else: assert func_type is None or func_type is Tokenizer \ or func_type == Tokenizer or str(func_type) == "Tokenizer", \ f"func_type={func_type} is incompatible with return type {anno['return']} of " \ f"parameter prep_func when calling DHParser.pipeline.create_preprocess_junction()" prep = make_preprocessor(prep_func) except KeyError: assert func_type is not None, \ "Please specify the kind of preprocessor by passing parameter func_type=Tokenizer " \ "or func_type=PreprocessorFunc to DHParser.pipeline.create_preprocess_junction()" prep = prep_func if func_type is PreprocessorFunc or func_type == PreprocessorFunc \ or str(func_type) == 'PreprocessorFunc' else make_preprocessor(prep_func) return chain_preprocessors(include_prep, prep) def same_name(name: str) -> str: return name
[docs] def create_preprocess_junction(prep_func: Union[PreprocessorFunc, Tokenizer], include_regex, comment_regex, derive_file_name: DeriveFileNameFunc=same_name, func_type: Optional[type]=None) -> PseudoJunction: """Creates a factory for thread-safe preprocessing functions as well as a thread-safe preprocessing function.""" preprocessor_factory = partial( _preprocessor_factory, prep_func=prep_func, include_regex=include_regex, comment_regex=comment_regex, derive_file_name=derive_file_name, func_type=func_type) thread_safe_factory = ThreadLocalSingletonFactory(preprocessor_factory) # preprocess = partial(_preprocess, factory=thread_safe_factory) return PseudoJunction(thread_safe_factory) # , preprocess)
# Parsing-stage ####################################################### # def _parse_func(parser_factory: Callable, document, start_parser = "root_parser__", # *, complete_match=True): # return parser_factory()(document, start_parser, complete_match=complete_match) def _parser_factory(raw_grammar, python_src='') -> Grammar: grammar = raw_grammar() if get_config_value('resume_notices'): resume_notices_on(grammar) elif get_config_value('history_tracking'): set_tracer(grammar, trace_history) try: if not grammar.__class__.python_src__: grammar.__class__.python_src__ = python_src except AttributeError: pass return grammar
[docs] def create_parser_junction(grammar_class: type) -> PseudoJunction: """Creates a factory for thread-safe parser functions as well as a thread-safe parser function.""" assert issubclass(grammar_class, Grammar) raw_grammar = ThreadLocalSingletonFactory(grammar_class) factory = partial(_parser_factory, raw_grammar=raw_grammar) # process = partial(_parse_func, parser_factory=factory) return PseudoJunction(factory) # , process)
# Tree-Processing-Stages ############################################## # 1. tree-processing with Compiler-class # def process_template(src_tree: Node, src_stage: str, dst_stage: str, # factory_function) -> Any: # """A generic templare for tree-transformation-functions.""" # if isinstance(src_tree, RootNode): # assert src_tree.stage == src_stage # result = factory_function()(src_tree) # if isinstance(result, RootNode): # assert result.stage in (src_stage, dst_stage) # result.stage = dst_stage # return result
[docs] def create_compiler_junction(compile_class: CompilerFactory, src_stage: str, dst_stage: str) -> Junction: """Creates a thread-safe transformation function and function-factory from a :py:class:`compile.Compiler` or another callable class. """ assert callable(compile_class) factory = ThreadLocalSingletonFactory(compile_class) # process = partial(process_template, src_stage=src_stage, dst_stage=dst_stage, # factory_function=factory) return Junction(src_stage, factory, dst_stage)
# 2. tree-processing with transformation-table def _make_transformer(src_stage, dst_stage, table) -> TransformerFunc: return staticmethod(partial(transformer, transformation_table=table.copy(), src_stage=src_stage, dst_stage=dst_stage)) @deprecated("DHParser.pipeline.create_transtable_junction() is deprecated, " "because it does not work with lambdas as transformer functions! " "Please, use a hardcoded factory function that refers to the transtable " "directly instead.") def create_transtable_junction(table: TransformationDict, src_stage: str, dst_stage: str) -> Junction: """Creates a thread-safe transformation function and function-factory from a transformation-table :py:func:`transform.traverse`. TODO: This does not work if table contains functions that cannot be pickled (i.e. lambda-functions)! """ assert isinstance(table, dict) make_transformer = partial(_make_transformer, src_stage, dst_stage, table) factory = ThreadLocalSingletonFactory(make_transformer, uniqueID=id(table)) return Junction(src_stage, factory, dst_stage) @deprecated("DHParser.pipeline.create_transtable_transition() is deprecated, " "because it does not work with lambdas as transformer functions! " "Please, use a hardcoded factory function that refers to the transtable " "directly instead.") def create_transtable_transition(*args, **kwargs): return create_transtable_junction(*args, **kwargs) # 3. tree-processing with evaluation-table def _make_evaluation(actions, supply_path_arg) -> Callable[[Node], Any]: def evaluate_with_path(node: Node) -> Any: return node.evaluate_path(actions, [node]) def evaluate_without_path(node: Node) -> Any: return node.evaluate(actions) return evaluate_with_path if supply_path_arg else evaluate_without_path
[docs] @deprecated("DHParser.pipeline.create_evaluation_junction() is deprecated, " "because it does not work with lambdas as transformer functions! " "Please, use a hardcoded factory function that refers to the evaluation-table " "directly instead.") def create_evaluation_junction(actions: Dict[str, Callable], src_stage: str, dst_stage: str, supply_path_arg: bool=True) -> Junction: """Creates a thread-safe transformation function and function-factory from an evaluation-table :py:meth:`nodetree.Node.evaluate`. """ assert isinstance(actions, dict) a = dir(actions) n = actions.__class__ make_evaluation = partial( _make_evaluation, actions=actions, supply_path_arg=supply_path_arg) factory = ThreadLocalSingletonFactory(make_evaluation, uniqueID=id(actions)) # process = partial(process_template, src_stage=src_stage, dst_stage=dst_stage, # factory_function=factory) return Junction(src_stage, factory, dst_stage)
# generic tree-processing function
[docs] def create_junction(tool: CompilerFactory, src_stage: str, dst_stage: str, hint: str = '') -> Junction: """Generic stage-creation function for tree-transforming stages where a tree-transforming stage is a stage that either reshapes a node-tree or transforms a nodetree into something else, but not a stage where something else (e.g. a text) is turned into a node-tree.""" if hint: deprecation_warning("Parameter 'hint' of functsion DHParser.pipeline.create_junction() " "is deprecated and should not be used, any more!") if callable(tool): return create_compiler_junction(tool, src_stage, dst_stage) else: deprecation_warning("Parameter 'tool' DHParser.pipeline.create_junction() must be a " "CompilerFactory, i.e. a factory function or class that returns a " "transformation function or callable object. Dictionaries, and in" "particular transformation or evaluation tables are not supported, " "anymore. Please wrap your table into a factory function instead!") if any(isinstance(value, Sequence) for value in tool.values()) \ or hint == "transtable": return create_transtable_junction(tool, src_stage, dst_stage) elif hint == "evaluate_with_path": return create_evaluation_junction(tool, src_stage, dst_stage, True) elif hint == "evaluate_without_path": return create_evaluation_junction(tool, src_stage, dst_stage, False) else: raise AssertionError('Cannot determine transformation-type automatically! ' 'Please, add optional parameter "hint" to the function call with one of the ' 'following values: "transtable", "evaluate_with_path", "evaluate_without_path"!')