Source code for dsl

# dsl.py - Support for domain specific notations for DHParser
#
# Copyright 2016  by Eckhart Arnold (arnold@badw.de)
#                 Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.

"""
Module ``dsl`` contains high-level functions for the compilation
of domain specific languages based on an EBNF-grammar.
"""

from __future__ import annotations

from collections import namedtuple
import concurrent.futures
from functools import lru_cache
import inspect
import os
import platform
import stat
import sys
from typing import Any, cast, List, Tuple, Union, Iterator, Iterable, Optional, \
    Callable, Sequence, Dict, Set

import DHParser.ebnf
from DHParser.compile import Compiler, compile_source, CompilerFactory
from DHParser.pipeline import full_pipeline, Junction
from DHParser.configuration import get_config_value, set_config_value
from DHParser.ebnf import EBNFCompiler, grammar_changed, DHPARSER_IMPORTS, \
    get_ebnf_preprocessor, get_ebnf_grammar, get_ebnf_transformer, get_ebnf_compiler
from DHParser.error import Error, is_error, has_errors, only_errors, canonical_error_strings, \
    CANNOT_VERIFY_TRANSTABLE_WARNING, ErrorCode, ERROR
from DHParser.log import suspend_logging, resume_logging, is_logging, log_dir, append_log
from DHParser.nodetree import Node
from DHParser.parse import Grammar, ParserFactory
from DHParser.preprocess import nil_preprocessor, PreprocessorFunc, \
    PreprocessorFactory
from DHParser.transform import TransformerFunc, TransformationDict, TransformerFactory
from DHParser.toolkit import DHPARSER_DIR, load_if_file, is_python_code, is_filename, \
    compile_python_object, re, as_identifier, cpu_count, \
    deprecated, instantiate_executor
from DHParser.versionnumber import __version__, __version_info__


__all__ = ('DefinitionError',
           'CompilationError',
           'read_template',
           'load_compiler_suite',
           'compileDSL',
           'raw_compileEBNF',
           'compileEBNF',
           'grammar_provider',
           'create_parser',
           'compile_on_disk',
           'recompile_grammar',
           'create_scripts',
           'restore_server_script',
           'process_file',
           'never_cancel',
           'batch_process')


[docs] @lru_cache() def read_template(template_name: str) -> str: """ Reads a script-template from a template file named ``template_name`` in the template-directory and returns it as a string. """ with open(os.path.join(DHPARSER_DIR, 'templates', template_name), 'r', encoding='utf-8') as f: return f.read()
SECTION_MARKER = """\n ####################################################################### # # {marker} # ####################################################################### \n""" RX_SECTION_MARKER = re.compile(SECTION_MARKER.format(marker=r'.*?SECTION.*?')) RX_WHITESPACE = re.compile(r'\s*') SYMBOLS_SECTION = "SYMBOLS SECTION - Can be edited. Changes will be preserved." PREPROCESSOR_SECTION = "PREPROCESSOR SECTION - Can be edited. Changes will be preserved." CUSTOM_PARSER_SECTION = "CUSTOM PARSER Section - Can be edited. Changes will be preserved." PARSER_SECTION = "PARSER SECTION - Don't edit! CHANGES WILL BE OVERWRITTEN!" AST_SECTION = "AST SECTION - Can be edited. Changes will be preserved." COMPILER_SECTION = "COMPILER SECTION - Can be edited. Changes will be preserved." END_SECTIONS_MARKER = "END OF DHPARSER-SECTIONS" class DSLException(Exception): """ Base class for DSL-exceptions. """ def __init__(self, errors: Union[Sequence[Error], Iterator[Error]]): Exception.__init__(self) assert isinstance(errors, Iterator) or isinstance(errors, list) \ or isinstance(errors, tuple) self.errors = list(errors) def __str__(self): if len(self.errors) == 1: return str(self.errors[0]) return '\n' + '\n'.join(("%i. " % (i + 1) + str(err)) for i, err in enumerate(self.errors)) # return '\n'.join(str(err) for err in self.errors)
[docs] class DefinitionError(DSLException): """ Raised when (already) the grammar of a domain specific language (DSL) contains errors. Usually, these are repackaged parse.GrammarError(s). """ def __init__(self, errors, grammar_src): super().__init__(errors) self.grammar_src = grammar_src
[docs] class CompilationError(DSLException): """ Raised when a string or file in a domain specific language (DSL) contains errors. These can also contain definition errors that have been caught early. """ def __init__(self, errors, dsl_text, dsl_grammar, AST, result): super().__init__(errors) self.dsl_text = dsl_text self.dsl_grammar = dsl_grammar self.AST = AST self.result = result def __str__(self): return '\n'.join(str(error) for error in self.errors)
def error_str(messages: Iterable[Error]) -> str: """ Returns all true errors (i.e. not just warnings) from the ``messages`` as a concatenated multiline string. """ return '\n\n'.join(str(m) for m in messages if is_error(m.code)) def grammar_instance(grammar_representation) -> Tuple[Grammar, str]: """ Returns a grammar object and the source code of the grammar, from the given ``grammar``-data which can be either a file name, ebnf-code, python-code, a Grammar-derived grammar class or an instance of such a class (i.e. a grammar object already). """ if isinstance(grammar_representation, str): # read grammar grammar_src = load_if_file(grammar_representation) if is_python_code(grammar_src): parser_py = grammar_src # type: str messages = [] # type: List[Error] else: lg_dir = suspend_logging() result, messages, _ = compile_source( grammar_src, None, get_ebnf_grammar(), get_ebnf_transformer(), get_ebnf_compiler()) parser_py = cast(str, result) resume_logging(lg_dir) if has_errors(messages): raise DefinitionError(only_errors(messages), grammar_src) imports = DHPARSER_IMPORTS grammar_class = compile_python_object(imports + parser_py, r'\w+Grammar$') if inspect.isclass(grammar_class) and issubclass(grammar_class, Grammar): parser_root = grammar_class() else: raise ValueError('Could not compile or Grammar class!') else: # assume that dsl_grammar is a ParserHQ-object or Grammar class grammar_src = '' if isinstance(grammar_representation, Grammar): parser_root = grammar_representation else: # assume ``grammar_representation`` is a grammar class and get the root object parser_root = grammar_representation() return parser_root, grammar_src
[docs] def compileDSL(text_or_file: str, preprocessor: Optional[PreprocessorFunc], dsl_grammar: Union[str, Grammar], ast_transformation: TransformerFunc, compiler: Compiler, fail_when: ErrorCode = ERROR) -> Any: """ Compiles a text in a domain specific language (DSL) with an EBNF-specified grammar. Returns the compiled text or raises a compilation error. Raises: CompilationError if any errors occurred during compilation """ assert isinstance(text_or_file, str) assert isinstance(compiler, Compiler) parser, grammar_src = grammar_instance(dsl_grammar) result, messages, AST = compile_source(text_or_file, preprocessor, parser, ast_transformation, compiler) if has_errors(messages, fail_when): src = load_if_file(text_or_file) raise CompilationError(only_errors(messages, fail_when), src, grammar_src, AST, result) return result
[docs] def raw_compileEBNF(ebnf_src: str, branding="DSL", fail_when: ErrorCode = ERROR) -> EBNFCompiler: """ Compiles an EBNF grammar file and returns the compiler object that was used and which can now be queried for the result as well as skeleton code for preprocessor, transformer and compiler objects. Args: ebnf_src(str): Either the file name of an EBNF grammar or the EBNF grammar itself as a string. branding (str): Branding name for the compiler suite source code. Returns: An instance of class ``ebnf.EBNFCompiler`` Raises: CompilationError if any errors occurred during compilation """ grammar = get_ebnf_grammar() compiler = get_ebnf_compiler(branding, ebnf_src) transformer = get_ebnf_transformer() compileDSL(ebnf_src, nil_preprocessor, grammar, transformer, compiler, fail_when) return compiler
VERSION_CHECK=f"""import DHParser.versionnumber if DHParser.versionnumber.__version_info__ < {__version_info__}: print(f'DHParser version {{DHParser.versionnumber.__version__}} is lower than the DHParser ' f'version {__version__}, {{os.path.basename(__file__)}} has first been generated with. ' f'Please install a more recent version of DHParser to avoid unexpected errors!') """
[docs] def compileEBNF(ebnf_src: str, branding="DSL") -> str: """ Compiles an EBNF source file and returns the source code of a compiler suite with skeletons for preprocessor, transformer and compiler. Args: ebnf_src(str): Either the file name of an EBNF-grammar or the EBNF-grammar itself as a string. branding (str): Branding name for the compiler suite source code. Returns: The complete compiler suite skeleton as Python source code. Raises: CompilationError if any errors occurred during compilation """ compiler = raw_compileEBNF(ebnf_src, branding) src = ["#/usr/bin/python\n", SECTION_MARKER.format(marker=SYMBOLS_SECTION), DHPARSER_IMPORTS, VERSION_CHECK, SECTION_MARKER.format(marker=PREPROCESSOR_SECTION), compiler.gen_preprocessor_skeleton(), SECTION_MARKER.format(marker=CUSTOM_PARSER_SECTION), compiler.gen_custom_parser_example(), SECTION_MARKER.format(marker=PARSER_SECTION), compiler.result, SECTION_MARKER.format(marker=AST_SECTION), compiler.gen_transformer_skeleton(), SECTION_MARKER.format(marker=COMPILER_SECTION), compiler.gen_compiler_skeleton(), SECTION_MARKER.format(marker=END_SECTIONS_MARKER), read_template('DSLParser.pyi').format(NAME=branding)] return '\n'.join(src)
[docs] @lru_cache() def grammar_provider(ebnf_src: str, branding="DSL", additional_code: str = '', fail_when: ErrorCode = ERROR) -> ParserFactory: """ Compiles an EBNF-grammar and returns a grammar-parser provider function for that grammar. Args: ebnf_src(str): Either the file name of an EBNF grammar or the EBNF-grammar itself as a string. branding (str or bool): Branding name for the compiler suite source code. additional_code: Python code added to the generated source. This typically contains the source code of semantic actions referred to in the generated source, e.g. filter-functions, resume-point-search-functions Returns: A provider function for a grammar object for texts in the language defined by ``ebnf_src``. """ grammar_src = compileDSL( ebnf_src, get_ebnf_preprocessor(), get_ebnf_grammar(), get_ebnf_transformer(), get_ebnf_compiler(branding, ebnf_src), fail_when) log_name = get_config_value('compiled_EBNF_log') if log_name and is_logging(): append_log(log_name, grammar_src) imports = DHPARSER_IMPORTS parsing_stage = compile_python_object('\n'.join([imports, additional_code, grammar_src]), r'parsing') # r'get_(?:\w+_)?grammar$' if callable(parsing_stage.factory): parsing_stage.factory.python_src__ = grammar_src return parsing_stage.factory raise ValueError('Could not compile grammar provider!')
[docs] def create_parser(ebnf_src: str, branding="DSL", additional_code: str = '') -> Grammar: """Compiles the ebnf source into a callable Grammar-object. This is essentially syntactic sugar for ``grammar_provider(ebnf)()``. """ grammar_factory = grammar_provider(ebnf_src, branding, additional_code) grammar = grammar_factory() grammar.python_src__ = grammar_factory.python_src__ return grammar
def split_source(file_name: str, file_content: str) -> List[str]: """Splits the ``file_content`` into the seven sections: intro, imports, preprocessor_py, parser_py, ast_py, compiler_py, outro. Raises a value error, if the number of sections if not equal to 7. """ sections = RX_SECTION_MARKER.split(file_content) ls = len(sections) if ls != 7: raise ValueError('File "%s" contains %i instead of 7 sections. Please ' 'delete or repair file manually.' % (file_name, ls)) return sections
[docs] def load_compiler_suite(compiler_suite: str) -> \ Tuple[PreprocessorFactory, ParserFactory, TransformerFactory, CompilerFactory]: """ Extracts a compiler suite from file or string ``compiler_suite`` and returns it as a tuple (preprocessor, parser, ast, compiler). Returns: 4-tuple (preprocessor function, parser class, ast transformer function, compiler class) """ global RX_SECTION_MARKER assert isinstance(compiler_suite, str) source = load_if_file(compiler_suite) imports = DHPARSER_IMPORTS if is_python_code(compiler_suite): sections = split_source(compiler_suite, source) _, imports, preprocessor_py, parser_py, ast_py, compiler_py, _ = sections # TODO: Compile in one step and pick parts from namespace later ? preprocessor = compile_python_object(imports + preprocessor_py, r'preprocessing').factory parser = compile_python_object(imports + parser_py, r'parsing').factory ast = compile_python_object(imports + ast_py, r'ASTTransformation').factory else: # Assume source is an ebnf grammar. # Is there really any reasonable application case for this? lg_dir = suspend_logging() compiler_py, messages, _ = compile_source(source, None, get_ebnf_grammar(), get_ebnf_transformer(), get_ebnf_compiler(compiler_suite, source)) resume_logging(lg_dir) if has_errors(messages): raise DefinitionError(only_errors(messages), source) preprocessor = get_ebnf_preprocessor parser = get_ebnf_grammar ast = get_ebnf_transformer compiler = compile_python_object(imports + compiler_py, r'compiling').factory if callable(preprocessor) and callable(parser) and callable(Callable) and callable(compiler): return preprocessor, parser, ast, compiler raise ValueError('Could not generate compiler suite from source code!')
def is_outdated(compiler_suite: str, grammar_source: str) -> bool: """ Returns ``True`` if the ``compile_suite`` needs to be updated. An update is needed, if either the grammar in the compiler suite does not reflect the latest changes of ``grammar_source`` or if sections from the compiler suite have diligently been overwritten with whitespace order to trigger their recreation. Note: Do not delete or overwrite the section marker itself. Args: compiler_suite: the parser class representing the grammar or the file name of a compiler suite containing the grammar grammar_source: File name or string representation of the EBNF code of the grammar Returns (bool): True, if ``compiler_suite`` seems to be out of date. """ try: _, grammar, _, _ = load_compiler_suite(compiler_suite) return grammar_changed(grammar(), grammar_source) except ValueError: return True def run_compiler(text_or_file: str, compiler_suite: str, fail_when: ErrorCode = ERROR) -> Any: """Compiles a source with a given compiler suite. Args: text_or_file (str): Either the file name of the source code or the source code directly. (Which is determined by heuristics. If ``text_or_file`` contains at least on linefeed then it is always assumed to be a source text and not a file name.) compiler_suite(str): File name of the compiler suite to be used. Returns: The result of the compilation, the form and type of which depends entirely on the compiler. Raises: CompilerError """ preprocessor, parser, ast, compiler = load_compiler_suite(compiler_suite) return compileDSL(text_or_file, preprocessor(), parser(), ast(), compiler(), fail_when)
[docs] def compile_on_disk(source_file: str, parser_name: str = '', compiler_suite: str = "", extension: str = ".xml") -> Iterable[Error]: """ Compiles a source file with a given compiler and writes the result to a file. If no ``compiler_suite`` is given it is assumed that the source file is an EBNF grammar. In this case the result will be a Python script containing a parser for that grammar as well as the skeletons for a preprocessor, AST transformation table, and compiler. If the Python script already exists only the parser name in the script will be updated. (For this to work, the different names need to be delimited section marker blocks.). :py:func:`compile_on_disk` returns a list of error messages or an empty list if no errors occurred. :param source_file: The file name of the source text to be compiled. :param parser_name: The name of the generated parser. If the empty string is passed, the default name "...Parser.py" will be used. :param compiler_suite: The file name of the parser/compiler-suite (usually ending with 'Parser.py'), with which the source file shall be compiled. If this is left empty, the source file is assumed to be an EBNF-Grammar that will be compiled with the internal EBNF-Compiler. :param extension: The result of the compilation (if successful) is written to a file with the same name but a different extension than the source file. This parameter sets the extension. :returns: A (potentially empty) list of error or warning messages. """ filepath = os.path.normpath(source_file) rootname = os.path.splitext(filepath)[0] if not parser_name: parser_name = rootname + 'Parser.py' f = None # Optional[TextIO] with open(source_file, encoding="utf-8") as f: source = f.read() # dhpath = relative_path(os.path.dirname(rootname), DHPARSER_PARENTDIR) compiler_name = as_identifier(os.path.basename(rootname)) if compiler_suite: sfactory, pfactory, tfactory, cfactory = load_compiler_suite(compiler_suite) compiler1 = cfactory() else: sfactory = get_ebnf_preprocessor # PreprocessorFactory pfactory = get_ebnf_grammar # ParserFactory tfactory = get_ebnf_transformer # TransformerFactory cfactory = get_ebnf_compiler # CompilerFactory compiler1 = cfactory() # CompilerFunc is_ebnf_compiler = False # type: bool if isinstance(compiler1, EBNFCompiler): is_ebnf_compiler = True compiler1.set_grammar_name(compiler_name, source_file) result, messages, _ = compile_source(source, sfactory(), pfactory(), tfactory(), compiler1) if has_errors(messages): return messages elif is_ebnf_compiler: # trans == get_ebnf_transformer or trans == EBNFTransformer: # either an EBNF- or no compiler suite given ebnf_compiler = cast(EBNFCompiler, compiler1) # type: EBNFCompiler global SECTION_MARKER, RX_SECTION_MARKER, PREPROCESSOR_SECTION, PARSER_SECTION, \ AST_SECTION, COMPILER_SECTION, END_SECTIONS_MARKER, RX_WHITESPACE f = None try: f = open(parser_name, 'r', encoding="utf-8") source = f.read() sections = split_source(parser_name, source) intro, imports, preprocessor, _, ast, compiler, outro = sections ast_trans_python_src = imports + ast ast_trans_table = dict() # type: TransformationDict try: ast_trans_table = compile_python_object(ast_trans_python_src, r'(?:\w+_)?AST_transformation_table$') except Exception as e: if isinstance(e, NameError): err_str = 'NameError "{}" while compiling AST-Transformation. ' \ 'Possibly due to a forgotten import at the beginning ' \ 'of the AST-Block (!)'.format(str(e)) elif isinstance(e, ValueError): err_str = f'Exception {type(e)}: "{e}" while compiling AST-Transformation. ' \ f'This warning can safely be ignored, if a different method ' \ f'without a transformation-table or no AST-transformation at ' \ f'all is used for "{os.path.basename(rootname)}".' else: err_str = 'Exception {} while compiling AST-Transformation: {}' \ .format(str(type(e)), str(e)) messages.append(Error(err_str, 0, CANNOT_VERIFY_TRANSTABLE_WARNING)) if is_logging(): with open(os.path.join(log_dir(), rootname + '_AST_src.py'), 'w', encoding='utf-8') as f: f.write(ast_trans_python_src) messages.extend(ebnf_compiler.verify_transformation_table(ast_trans_table)) # TODO: Verify compiler except (PermissionError, FileNotFoundError, IOError): intro, imports, preprocessor, _, ast, compiler, outro = '', '', '', '', '', '', '' finally: if f: f.close() f = None if RX_WHITESPACE.fullmatch(intro): intro = '#!/usr/bin/env python3' if RX_WHITESPACE.fullmatch(outro): outro = read_template('DSLParser.pyi').format(NAME=compiler_name) if RX_WHITESPACE.fullmatch(imports): imports = DHParser.ebnf.DHPARSER_IMPORTS + VERSION_CHECK elif imports.find("from DHParser.") < 0 \ or imports.find('PseudoJunction') < 0 \ or imports.find('create_parser_junction') < 0: imports += "\nfrom DHParser.pipeline import PseudoJunction, create_parser_junction\n" if RX_WHITESPACE.fullmatch(preprocessor): preprocessor = ebnf_compiler.gen_preprocessor_skeleton() if RX_WHITESPACE.fullmatch(ast): ast = ebnf_compiler.gen_transformer_skeleton() if RX_WHITESPACE.fullmatch(compiler): compiler = ebnf_compiler.gen_compiler_skeleton() try: f = open(parser_name, 'w', encoding="utf-8") f.write(intro) f.write(SECTION_MARKER.format(marker=SYMBOLS_SECTION)) f.write(imports) f.write(SECTION_MARKER.format(marker=PREPROCESSOR_SECTION)) f.write(preprocessor) f.write(SECTION_MARKER.format(marker=PARSER_SECTION)) f.write(cast(str, result)) f.write(SECTION_MARKER.format(marker=AST_SECTION)) f.write(ast) f.write(SECTION_MARKER.format(marker=COMPILER_SECTION)) f.write(compiler) f.write(SECTION_MARKER.format(marker=END_SECTIONS_MARKER)) f.write(outro) except (PermissionError, FileNotFoundError, IOError) as error: print(f'# Could not write file "{parser_name}" because of: ' + "\n# ".join(str(error).split('\n)'))) print(result) finally: if f: f.close() if platform.system() != "Windows": # set file permissions so that the parser_name can be executed st = os.stat(parser_name) os.chmod(parser_name, st.st_mode | stat.S_IEXEC) else: f = None try: f = open(rootname + extension, 'w', encoding="utf-8") if isinstance(result, Node): if extension.lower() == '.xml': f.write(result.as_xml()) else: f.write(result.as_sxpr()) elif isinstance(result, str): f.write(result) else: raise AssertionError('Illegal result type: ' + str(type(result))) except (PermissionError, FileNotFoundError, IOError) as error: print('# Could not write file "' + rootname + '.py" because of: ' + "\n# ".join(str(error).split('\n)'))) print(result) finally: if f: f.close() return messages
[docs] def recompile_grammar(ebnf_filename: str, parser_name: str = '', force: bool = False, notify: Callable = lambda: None) -> bool: """ Re-compiles an EBNF-grammar if necessary, that is, if either no corresponding 'XXXXParser.py'-file exists or if that file is outdated. :param ebnf_filename: The filename of the ebnf-source of the grammar. In case this is a directory and not a file, all files within this directory ending with .ebnf will be compiled. :param parser_name: The name of the compiler script. If not given the ebnf-filename without extension and with the addition of "Parser.py" will be used. :param force: If False (default), the grammar will only be recompiled if it has been changed. :param notify: 'notify' is a function without parameters that is called when recompilation actually takes place. This can be used to inform the user. :returns: True, if recompilation of grammar has been successful or did not take place, because the Grammar hasn't changed since the last compilation. False, if the recompilation of the grammar has been attempted but failed. """ if os.path.isdir(ebnf_filename): success = True for entry in os.listdir(ebnf_filename): if entry.lower().endswith('.ebnf') and os.path.isfile(entry): success = success and recompile_grammar(entry, parser_name, force) return success base, _ = os.path.splitext(ebnf_filename) if not parser_name: parser_name = base + 'Parser.py' error_file_name = base + '_ebnf_MESSAGES.txt' messages = [] # type: Iterable[Error] if (not os.path.exists(parser_name) or force or grammar_changed(parser_name, ebnf_filename)): notify() messages = compile_on_disk(ebnf_filename, parser_name) # try again with heuristic EBNF-parser, if parser failed due to a # different EBNF-dialect if len(messages) == 2 and str(messages[1]).find("'DEF' expected by parser 'definition'") >= 0: syntax_variant = get_config_value('syntax_variant') if syntax_variant != 'heuristic': set_config_value('syntax_variant', 'heuristic') messages2 = compile_on_disk(ebnf_filename, parser_name) set_config_value('syntax_variant', syntax_variant) if not has_errors(messages2): messages = messages2 if messages: # print("Errors while compiling: " + ebnf_filename + '!') with open(error_file_name, 'w', encoding="utf-8") as f: for e in messages: f.write(str(e)) f.write('\n') if has_errors(messages): return False if not messages and os.path.exists(error_file_name): os.remove(error_file_name) return True
[docs] def create_scripts(ebnf_filename: str, parser_name: str = '', server_name: Optional[str] = '', app_name: Optional[str] = '', overwrite: bool = False): """Creates a parser script from the grammar with the filename ``ebnf_filename'`` or, if ebnf_filename referes to a directory from all grammars in files ending with ".ebnf" in that directory. If ``server_name`` is not None a script for starting a parser-server will be created as well. Running the parser as a server has the advantage that the startup time for calling the parser is greatly reduced for subsequent parser calls. (While the same can be achieved with running the parser script in batch-processing-mode by passing a directory or several filenames on the command line to the parser script, batch processing is not suitable for all application cases. For example, it is not usable when implementing language servers to feed editors with data from the parseing process.) if ``app_name`` is not None an application script with a tkinter-based graphical user interface will be created as well. (When distributing this script with pyinstaller, parallel processing should be turned off at least on MS Windows systems!) :param ebnf_filename: The filename of the grammar, from which the servfer script's filename is derived. :param parser_name: The filename of the parser script or the empty string if the default filename shall be used. :param server_name: The filename of the server script of the empty string if the default filename shall be used, or None if no server script shall be created. :param app_name: The filename of the server script of the empty string if the default filename shall be used, or None if no app-script shall be created :param overwrite: If True an existing server script will be overwritten. """ if os.path.isdir(ebnf_filename): for entry in os.listdir(ebnf_filename): if entry.lower().endswith('.ebnf') and os.path.isfile(entry): restore_server_script(entry) return base, _ = os.path.splitext(ebnf_filename) name = os.path.basename(base) def create_script(script_name, template_name): nonlocal base template = read_template(template_name) with open(script_name, 'w', encoding='utf-8') as f: f.write(template.replace('DSL', name)) if platform.system() != "Windows": # set file permissions so that the server-script can be executed st = os.stat(script_name) os.chmod(script_name, st.st_mode | stat.S_IEXEC) if not parser_name: parser_name = base + 'Parser.py' if server_name is not None and not server_name: server_name = base + 'Server.py' if app_name is not None and not app_name: app_name = base + 'App.py' if server_name is not None and (not os.path.exists(server_name) or overwrite): create_script(server_name, "DSLServer.pyi") if app_name is not None and (not os.path.exists(app_name) or overwrite): create_script(app_name, "DSLApp.pyi") if not os.path.exists(parser_name): recompile_grammar(ebnf_filename, parser_name)
@deprecated("restore_server_script() is deprecated! Please, use create_scripts().") def restore_server_script(ebnf_filename: str, parser_name: str = '', server_name: str = '', overwrite: bool = False): create_scripts(ebnf_filename, parser_name, server_name, None, overwrite) ####################################################################### # # batch-processing # #######################################################################
[docs] def process_file(source: str, out_dir: str, preprocessor_factory: PreprocessorFactory, parser_factory: ParserFactory, junctions: Set[Junction], targets: Set[str], serializations: Dict[str, List[str]]) -> str: """Compiles the source and writes the serialized results back to disk, unless any fatal errors have occurred. Error and Warning messages are written to a file with the same name as `result_filename` with an appended "_ERRORS.txt" or "_WARNINGS.txt" in place of the name's extension. Returns the name of the error-messages file or an empty string, if no errors or warnings occurred. :param source: the source document or the filename of the source-document :param out_dir: the path of the output-directory. If the output-directory does not exist, it will be created. :param preprocessor_factory: A factory-function that returns a preprocessing function. :param parser_factory: A factory-function that returns a parser function which, usually, is a :py:class:`parse.Grammar`-object. :param junctions: a set of junctions for all processing stages beyond parsing. :param serializations: A dictionary of serialization names, e.g. "sxpr", "xml", "json" for those target stages that still are node-trees. These will be serialized and written to disk in all given serializations. :return: either the empty string or the file name of a file that contains the errors or warnings that occurred while processing the source. """ source_filename = source if is_filename(source) else 'unknown_document_name' dest_name = os.path.splitext(os.path.basename(source_filename))[0] results = full_pipeline(source, preprocessor_factory, parser_factory, junctions, targets) end_results = {t: r for t, r in results.items() if t in targets} # create target directories for t in end_results.keys(): path = os.path.join(out_dir, t) try: # do not use os.path.exists(), here: RACE CONDITION!! os.makedirs(path) except FileExistsError: if not os.path.isdir(path): error_file_name = dest_name + '_FATAL_ERROR.txt' with open(error_file_name, 'w', encoding='utf-8') as f: f.write(f'Destination directory path "{path}" already in use!') return error_file_name # write data errors = []; errstrs = set() items = end_results.items() if len(end_results) == len(targets) else results.items() for t, r in items: result, err = r for e in err: estr = str(e) if estr not in errstrs: errstrs.add(estr) errors.append(e) if t in end_results: path = os.path.join(out_dir, t, '.'.join([dest_name, t])) if isinstance(result, Node): slist = serializations.get(t, serializations.get( '*', get_config_value('default_serialization'))) for s in slist: s = s.lower() # normalize file-extensions: if s == 'default': s = get_config_value('default_serialization').lower() elif s == 's-expression': s = 'sxpr' elif s == 'indented': s = 'tree' data = result.serialize(s) with open('.'.join([path, s]), 'w', encoding='utf-8') as f: f.write(data) else: with open(path, 'w', encoding='utf-8') as f: f.write(str(result)) errors.sort(key=lambda e: e.pos) if errors: err_ext = '_ERRORS.txt' if has_errors(errors, ERROR) else '_WARNINGS.txt' err_filename = os.path.join(out_dir, dest_name + err_ext) with open(err_filename, 'w', encoding='utf-8') as f: f.write('\n'.join(canonical_error_strings(errors))) return err_filename return ''
def never_cancel() -> bool: return False
[docs] def batch_process(file_names: List[str], out_dir: str, process_file: Callable[[Tuple[str, str]], str], *, submit_func: Callable = None, log_func: Callable = None, cancel_func: Callable = never_cancel) -> List[str]: """Compiles all files listed in file_names and writes the results and/or error messages to the directory `our_dir`. Returns a list of error messages files. """ def collect_results(res_iter, file_names, log_func, cancel_func) -> List[str]: error_list = [] if cancel_func(): return error_list for file_name, error_filename in zip(file_names, res_iter): if log_func: suffix = (" with " + error_filename[error_filename.rfind('_') + 1:-4]) \ if error_filename else "" log_func(f'Compiled "%s"' % os.path.basename(file_name) + suffix) if error_filename: error_list.append(error_filename) if cancel_func(): return error_list return error_list if submit_func is None: pool = instantiate_executor(get_config_value('batch_processing_parallelization'), concurrent.futures.ProcessPoolExecutor) res_iter = pool.map(process_file, ((name, out_dir) for name in file_names), chunksize=min(get_config_value('batch_processing_max_chunk_size'), max(1, len(file_names) // (cpu_count() * 4)))) error_files = collect_results(res_iter, file_names, log_func, cancel_func) if sys.version_info >= (3, 9): pool.shutdown(wait=True, cancel_futures=True) else: pool.shutdown(wait=True) else: futures = [submit_func(process_file, name, out_dir) for name in file_names] res_iter = (f.result() for f in futures) error_files = collect_results(res_iter, file_names, log_func, cancel_func) for f in futures: f.cancel() concurrent.futures.wait(futures) return error_files
# moved or deprecated functions PseudoJunction = namedtuple('PseudoJunction', ['factory'], module=__name__) @deprecated('create_preprocess_junction() has moved to the pipeline-module! Use "from DHParser.pipeline import create_preprocess_junction"') def create_preprocess_junction(tokenizer, include_regex, comment_regex, derive_file_name = lambda name: name): from DHParser import pipeline return pipeline.create_preprocess_junction( tokenizer, include_regex, comment_regex, derive_file_name) @deprecated('The name "create_preprocess_transition()" is deprecated. Use "DHParser.pipeline.create_preprocess_junction()" instead.') def create_preprocess_transition(*args, **kwargs): return create_preprocess_junction(*args, **kwargs) @deprecated('create_parser_junction() has moved to the pipeline-module! Use "from DHParser.pipeline import create_parser_junction"') def create_parser_junction(grammar_class): from DHParser import pipeline return pipeline.create_parser_junction(grammar_class) @deprecated('The name "create_parser_transition()" is deprecated. Use "DHParser.pipeline.create_parser_junction()" instead.') def create_parser_transition(*args, **kwargs): return create_parser_junction(*args, **kwargs) @deprecated('create_compiler_junction() has moved to the pipeline-module! Use "from DHParser.pipeline import create_compiler_junction"') def create_compiler_junction(compile_class, src_stage, dst_stage): from DHParser import pipeline return pipeline.create_compiler_junction(compile_class, src_stage, dst_stage) @deprecated('The name "create__compiler_transition()" is deprecated. Use "DHParser.pipeline.create_compiler_junction()" instead.') def create_compiler_transition(*args, **kwargs): return create_compiler_junction(*args, **kwargs) @deprecated("DHParser.dsl.create_transtable_transition() is deprecated, " "because it does not work with lambdas as transformer functions!") def create_transtable_junction(table, src_stage, dst_stage): # This does not work if table contains functions that cannot be pickled (i.e. lambda-functions)! from DHParser import pipeline return create_transtable_transition(table, src_stage, dst_stage) @deprecated('The name "create_transtable_transition()" is deprecated. Use "DHParser.pipeline.create_transtable_junction()" instead.') def create_transtable_transition(*args, **kwargs): return create_transtable_junction(*args, **kwargs) @deprecated('create_evaluation_junction() has moved to the pipeline-module! Use "from DHParser.pipeline import create_evaluation_junction"') def create_evaluation_junction(actions, src_stage, dst_stage, supply_path_arg): from DHParser import pipeline return pipeline.create_evaluation_junction(actions, src_stage, dst_stage, supply_path_arg) @deprecated('The name "create_evaluation_transition()" is deprecated. Use "DHParser.pipeline.create_evaluation_junction()" instead.') def create_evaluation_transition(*args, **kwargs): return create_evaluation_junction(*args, **kwargs) @deprecated('create_junction() has moved to the pipeline-module! Use "from DHParser.pipeline import create_junction"') def create_junction(tool, src_stage, dst_stage, hint = "?"): from DHParser import pipeline return pipeline.create_junction(tool, src_stage, dst_stage, hint) @deprecated('The name "create_transition()" is deprecated. Use "DHParser.pipeline.create_junction()" instead.') def create_transition(*args, **kwargs): return create_junction(*args, **kwargs)