# toolkit.py - utility functions for DHParser
#
# Copyright 2016 by Eckhart Arnold (arnold@badw.de)
# Bavarian Academy of Sciences an Humanities (badw.de)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""
Module ``toolkit`` contains utility functions that are needed across
several of the other DHParser-Modules Helper functions that are not
needed in more than one module are best placed within that module and
not in the toolkit-module. An acceptable exception to this rule are
functions that are very generic.
"""
from __future__ import annotations
import ast
import bisect
import concurrent.futures
import functools
import hashlib
import io
import json
import multiprocessing
import os
import sys
import threading
import traceback
import typing
try:
if sys.version.find('PyPy') >= 0:
import re # regex might not work with PyPy reliably: https://pypi.org/project/regex/
else:
import regex as re
except ImportError:
import re
try:
import dataclasses
except ImportError:
from DHParser.externallibs import dataclasses36 as dataclasses
from typing import Any, Iterable, Sequence, Set, AbstractSet, Union, Dict, List, Tuple, \
Optional, Type, Callable
try:
from typing import Protocol
except ImportError:
class Protocol:
pass
try:
from typing import TypeAlias
except ImportError:
from DHParser.externallibs.typing_extensions import TypeAlias
try:
import cython
cython_optimized = cython.compiled # type: bool
if cython_optimized: # not ?
import DHParser.externallibs.shadow_cython as cython
except (NameError, ImportError):
cython_optimized = False
import DHParser.externallibs.shadow_cython as cython
from DHParser.configuration import access_thread_locals, get_config_value, set_config_value, \
NEVER_MATCH_PATTERN
from DHParser.stringview import StringView
__all__ = ('typing',
're',
'dataclasses',
'Protocol',
'TypeAlias',
'cython_optimized',
'identify_python',
'identity',
# 'gen_id',
'ThreadLocalSingletonFactory',
'RX_NEVER_MATCH',
'RX_ENTITY',
'RX_NON_ASCII',
'validate_XML_attribute_value',
'fix_XML_attribute_value',
'lxml_XML_attribute_value',
'RxPatternType',
're_find',
'escape_re',
'escape_ctrl_chars',
'is_filename',
'is_html_name',
'relative_path',
'split_path',
'concurrent_ident',
'unrepr',
'abbreviate_middle',
'wrap_str_literal',
'wrap_str_nicely',
'printw',
'escape_formatstr',
'as_identifier',
'as_list',
'as_tuple',
'first',
'last',
'NOPE',
'matching_brackets',
'linebreaks',
'line_col',
'text_pos',
'normalize_docstring',
'issubtype',
'isgenerictype',
'load_if_file',
'is_python_code',
'md5',
'expand_table',
'compile_python_object',
'smart_list',
'JSON_Type',
'JSON_Dict',
'JSONstr',
'JSONnull',
'json_encode_string',
'json_dumps',
'json_rpc',
'pp_json',
'pp_json_str',
'sane_parser_name',
'normalize_circular_path',
'DHPARSER_DIR',
'deprecated',
'deprecation_warning',
'SingleThreadExecutor',
'multiprocessing_broken',
'instantiate_executor',
'cpu_count')
#######################################################################
#
# miscellaneous (generic)
#
#######################################################################
[docs]
def identify_python() -> str:
"""Returns a reasonable identification string for the python interpreter,
e.g. "cpython 3.8.6"."""
return "%s %i.%i.%i" % (sys.implementation.name, *sys.version_info[:3])
[docs]
def identity(x):
"""Canonical identity function. The purpose of defining identity()
here is to allow it to serve as a default value and to be
able to check whether a function parameter has been assigned
another than the default value or not."""
return x
global_id_counter: int = 0
def gen_id() -> int:
"""Generates a unique id. (Not thread-safe!)"""
global global_id_counter
global_id_counter += 1
return global_id_counter
[docs]
class ThreadLocalSingletonFactory:
"""
Generates a singleton-factory that returns one and
the same instance of `class_or_factory` for one and the
same thread, but different instances for different threads.
Note: Parameter uniqueID should be provided if class_or_factory is not
unique but generic. See source code of
:py:func:`DHParser.dsl.create_transtable_junction`
"""
def __init__(self, class_or_factory, name: str = "", *,
uniqueID: Union[str, int] = 0,
ident=None):
if ident is not None:
deprecation_warning('Parameter "ident" of DHParser.toolkit.ThreadLocalSingletonFactory'
' is deprecated and will be ignored.')
self.class_or_factory = class_or_factory
# partial functions do not have a __name__ attribute!
name = name or getattr(class_or_factory, '__name__', '') or class_or_factory.func.__name__
if not uniqueID:
if not hasattr(self.__class__, 'lock'):
self.__class__.lock = threading.Lock()
with self.__class__.lock:
uniqueID = gen_id()
self.singleton_name = f"{name}_{str(id(self))}_{str(uniqueID)}_singleton"
THREAD_LOCALS = access_thread_locals()
assert not hasattr(THREAD_LOCALS, self.singleton_name), self.singleton_name
def __call__(self):
THREAD_LOCALS = access_thread_locals()
try:
singleton = getattr(THREAD_LOCALS, self.singleton_name)
except AttributeError:
setattr(THREAD_LOCALS, self.singleton_name, self.class_or_factory())
singleton = getattr(THREAD_LOCALS, self.singleton_name)
return singleton
[docs]
@functools.lru_cache()
def is_filename(strg: str) -> bool:
"""
Tries to guess whether string ``strg`` is a file name.
"""
return strg.find('\n') < 0 and strg[:1] != " " and strg[-1:] != " " \
and all(strg.find(ch) < 0 for ch in '*?"<>|')
# and strg.select_if('*') < 0 and strg.select_if('?') < 0
[docs]
def is_html_name(url: str) -> bool:
"""Returns True, if url ends with .htm or .html"""
return url[-5:].lower() == '.html' or url[-4:].lower() == '.htm'
[docs]
@cython.locals(i=cython.int, L=cython.int)
def relative_path(from_path: str, to_path: str) -> str:
"""Returns the relative path in order to open a file from
`to_path` when the script is running in `from_path`. Example:
>>> relative_path('project/common/dir_A', 'project/dir_B').replace(chr(92), '/')
'../../dir_B'
"""
from_path = os.path.normpath(os.path.abspath(from_path)).replace('\\', '/')
to_path = os.path.normpath(os.path.abspath(to_path)).replace('\\', '/')
if from_path and from_path[-1] != '/':
from_path += '/'
if to_path and to_path[-1] != '/':
to_path += '/'
i = 0
L = min(len(from_path), len(to_path))
while i < L and from_path[i] == to_path[i]:
i += 1
return os.path.normpath(from_path[i:].count('/') * '../' + to_path[i:])
[docs]
def split_path(path: str) -> Tuple[str, ...]:
"""Splits a filesystem path into its components. Other than
os.path.split() it does not only split of the last part::
>>> split_path('a/b/c')
('a', 'b', 'c')
>>> os.path.split('a/b/c') # for comparison.
('a/b', 'c')
"""
split = os.path.split(path)
while split[0]:
split = os.path.split(split[0]) + split[1:]
return split[1:]
[docs]
def concurrent_ident() -> str:
"""
Returns an identificator for the current process and thread
"""
if sys.version_info >= (3, 8, 0):
return multiprocessing.current_process().name + '_' + str(threading.get_native_id())
else:
return multiprocessing.current_process().name + '_' + str(threading.get_ident())
[docs]
class unrepr:
"""
unrepr encapsulates a string representing a python function in such
a way that the representation of the string yields the function call
itself rather than a string representing the function call and delimited
by quotation marks.
Example:
>>> "re.compile(r'abc+')"
"re.compile(r'abc+')"
>>> unrepr("re.compile(r'abc+')")
re.compile(r'abc+')
"""
def __init__(self, s: str):
self.s = s # type: str
def __eq__(self, other: object) -> bool:
if isinstance(other, unrepr):
return self.s == other.s
elif isinstance(other, str):
return self.s == other
else:
raise TypeError('unrepr objects can only be compared with '
'other unrepr objects or strings!')
def __str__(self) -> str:
return self.s
def __repr__(self) -> str:
return self.s
[docs]
def as_list(item_or_sequence) -> List[Any]:
"""Turns an arbitrary sequence or a single item into a list. In case of
a single item, the list contains this element as its sole item."""
if isinstance(item_or_sequence, Iterable):
return list(item_or_sequence)
return [item_or_sequence]
[docs]
def as_tuple(item_or_sequence) -> Tuple[Any]:
"""Turns an arbitrary sequence or a single item into a tuple. In case of
a single item, the tuple contains this element as its sole item."""
if isinstance(item_or_sequence, Iterable):
return tuple(item_or_sequence)
return (item_or_sequence,)
[docs]
def first(item_or_sequence: Union[Sequence, Any]) -> Any:
"""Returns an item or the first item of a sequence of items."""
if isinstance(item_or_sequence, Sequence):
return item_or_sequence[0]
else:
return item_or_sequence
[docs]
def last(item_or_sequence: Union[Sequence, Any]) -> Any:
"""Returns an item or the first item of a sequence of items."""
if isinstance(item_or_sequence, Sequence):
return item_or_sequence[-1]
else:
return item_or_sequence
DEPRECATION_WARNINGS_ISSUED: Set[str] = set()
[docs]
def deprecation_warning(message: str):
"""Issues a deprecation warning. Makes sure that each message is only
called once."""
if message not in DEPRECATION_WARNINGS_ISSUED:
DEPRECATION_WARNINGS_ISSUED.add(message)
try:
raise DeprecationWarning(message)
except DeprecationWarning as e:
try:
deprecation_policy = get_config_value('deprecation_policy')
except AssertionError as e:
deprecation_policy = 'warn'
print(e)
if deprecation_policy == 'warn':
stacktrace = traceback.format_exc()
print(stacktrace)
else:
raise e
[docs]
def deprecated(message: str) -> Callable:
"""Decorator that marks a function as deprecated and emits
a deprecation message on its first use::
>>> @deprecated('This function is deprecated!')
... def bad():
... pass
>>> save = get_config_value('deprecation_policy')
>>> set_config_value('deprecation_policy', 'fail')
>>> try: bad()
... except DeprecationWarning as w: print(w)
This function is deprecated!
>>> set_config_value('deprecation_policy', save)
"""
assert isinstance(message, str)
def decorator(f: Callable) -> Callable:
@functools.wraps(f)
def wrapper(*args, **kwargs):
deprecation_warning(message or str(f))
return f(*args, **kwargs)
return wrapper
return decorator
#######################################################################
#
# miscellaneous (DHParser-specific)
#
#######################################################################
DHPARSER_DIR = os.path.dirname(os.path.abspath(__file__))
# DHPARSER_PARENTDIR = os.path.dirname(DHPARSER_DIR.rstrip('/').rstrip('\\'))
[docs]
def sane_parser_name(name) -> bool:
"""
Checks whether given name is an acceptable parser name. Parser names
must not be preceded or succeeded by a double underscore '__'!
"""
return name and name[:2] != '__' and name[-2:] != '__'
[docs]
def normalize_circular_path(path: Union[Tuple[str, ...], AbstractSet[Tuple[str, ...]]]) \
-> Union[Tuple[str, ...], Set[Tuple[str, ...]]]:
"""Returns a normalized version of a `circular path` represented as
a tuple or - if called with a set of paths instead of a single path
- a set of normalized paths.
A circular (or "recursive") path is a tuple of items, the order of which
matters, but not the starting point. This can, for example, be a tuple of
references from one symbol defined in an EBNF source text back to
(but excluding) itself.
For example, when defining a grammar for arithmetic, the tuple
('expression', 'term', 'factor') if a recursive path, because the
definition of a factor includes a (bracketed) expression and thus
refers back to `expression`
Normalizing is done by "ring-shifting" the tuple so that it starts
with the alphabetically first symbol in the path.
>>> normalize_circular_path(('term', 'factor', 'expression'))
('expression', 'term', 'factor')
"""
if isinstance(path, AbstractSet):
return {normalize_circular_path(p) for p in path}
else:
assert isinstance(path, Tuple)
first_sym = min(path)
i = path.index(first_sym)
return path[i:] + path[:i]
#######################################################################
#
# string manipulation and regular expressions
#
#######################################################################
RX_NEVER_MATCH = re.compile(NEVER_MATCH_PATTERN)
RxPatternType = Any
[docs]
def re_find(s, r, pos=0, endpos=9223372036854775807):
"""
Returns the match of the first occurrence of the regular expression
`r` in string (or byte-sequence) `s`. This is essentially a wrapper
for `re.finditer()` to avoid a try-catch StopIteration block.
If `r` cannot be found, `None` will be returned.
"""
if isinstance(r, (str, bytes)):
if (pos, endpos) != (0, 9223372036854775807):
r = re.compile(r)
else:
try:
m = next(re.finditer(r, s))
return m
except StopIteration:
return None
if r:
try:
m = next(r.finditer(s, pos, endpos))
return m
except StopIteration:
return None
[docs]
@deprecated('escape_re() is deprecated. Use re.escape() from the Python-Standard-Library instead!')
def escape_re(strg: str) -> str:
"""
Returns the string with all regular expression special characters escaped.
"""
# assert isinstance(strg, str)
# re_chars = r"\.^$*+?{}[]()#<>=|!"
# for esc_ch in re_chars:
# strg = strg.replace(esc_ch, '\\' + esc_ch)
# return strg
return re.escape(strg)
[docs]
def escape_ctrl_chars(strg: str) -> str:
r"""
Replace all control characters (e.g. `\n` `\t`) in a string
by their back-slashed representation and replaces backslash by
double backslash.
"""
s = repr(strg.replace('\\', r'\\')).replace('\\\\', '\\')[1:-1]
if s[:2] == r"\'" and s[-2:] == r"\'":
return ''.join(["'", s[2:-2], "'"])
elif s[:2] == r'\"' and s[-2:] == r'\"':
return ''.join(['"', s[2:-2], '"'])
return s
[docs]
def normalize_docstring(docstring: str) -> str:
"""
Strips leading indentation as well as leading
and trailing empty lines from a docstring.
"""
lines = docstring.replace('\t', ' ').split('\n')
# determine indentation
indent = 255 # highest integer value
for line in lines[1:]:
stripped = line.lstrip()
if stripped: # ignore empty lines
indent = min(indent, len(line) - len(stripped))
if indent >= 255: indent = 0
# remove trailing empty lines
while lines and not lines[-1].strip(): lines.pop()
if lines:
if lines[0].strip():
lines = [lines[0]] + [line[indent:] for line in lines[1:]]
else:
lines = [line[indent:] for line in lines[1:]]
# remove any empty lines at the beginning
while lines and not lines[0].strip(): del lines[0]
return '\n'.join(lines)
return ''
[docs]
@cython.locals(max_length=cython.int, length=cython.int, a=cython.int, b=cython.int)
def abbreviate_middle(s: str, max_length: int) -> str:
"""Shortens string `s` by replacing the middle part with an ellipsis
sign ` ... ` if the size of the string exceeds `max_length`."""
assert max_length > 6
length = len(s) # type: int
if length > max_length:
a = max_length // 2 - 2 # type: int
b = max_length // 2 - 3 # type: int
s = s[:a] + ' ... ' + s[-b:] if length > 40 else s
return s
[docs]
def wrap_str_literal(s: Union[str, List[str]], column: int = 80, offset: int = 0) -> str:
r"""Wraps an excessively long string literal over several lines.
Example::
>>> s = '"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"'
>>> print(wrap_str_literal(s, 25))
"abcdefghijklmnopqrstuvwx"
"yzABCDEFGHIJKLMNOPQRSTUVW"
"XYZ0123456789"
>>> s = 'r"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"'
>>> print("Call(" + wrap_str_literal(s, 40, 5) + ")")
Call(r"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM"
r"NOPQRSTUVWXYZ0123456789")
>>> s = 'fr"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"'
>>> print("Call(" + wrap_str_literal(s, 40, 5) + ")")
Call(fr"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM"
fr"NOPQRSTUVWXYZ0123456789")
>>> s = ['r"abcde', 'ABCDE"']
>>> print(wrap_str_literal(s))
r"abcde"
r"ABCDE"
"""
if isinstance(s, list):
assert len(s) > 0
parts = s
s = '\n'.join(s)
else:
parts = None
if s[0:1].isalpha():
i = 2 if s[1:2].isalpha() else 1
r = s[0:i]
s = s[i:]
if parts is not None:
parts[0] = parts[0][i:]
else:
r = ''
q = s[0:1]
assert len(s) >= 2 and q == s[-1] and q in ('"', "'", "`", "´"), \
"string literal must be enclosed by quotation marks"
if parts is None:
parts = [s[i: i + column] for i in range(0, len(s), column)]
for i in range(len(parts) - 1):
p = parts[i]
k = 1
while k <= len(p) and p[-k] == '\\':
k += 1
k -= 1
if k > 0:
parts[i] = p[:-k]
parts[i + 1] = p[-k:] + parts[i + 1]
if r: parts[0] = r + parts[0]
wrapped = (''.join([q, '\n', ' ' * offset, r, q])).join(parts)
return wrapped
[docs]
@cython.locals(wrap_column=cython.int, tolerance=cython.int, a=cython.int, i=cython.int, k=cython.int, m=cython.int)
def wrap_str_nicely(s: str, wrap_column: int = 79, tolerance: int = 24,
wrap_chars: str = ")]>, ") -> str:
r"""Line-wraps a single-line output string at 'wrap_column'. Tries to
find a suitable point for wrapping, i.e. after any of the wrap_characters.
If the strings spans multiple lines, the existing linebreaks will be kept
and no rewrapping takes place. In order to enforce rewrapping of multiline
strings, use: ``wrap_str_nicely(repr(s)[1:-1])``. (repr() replaces
linebreaks by the \\n-marker. The slicing [1:-1] removes the opening
and closing angular quotation marks that repr adds.
Examples::
>>> s = ('(X (l ",.") (A (O "123") (P (:Text "4") (em "56"))) '
... '(em (m "!?")) (B (Q (em "78") (:Text "9")) (R "abc")) '
... '(n "+-"))')
>>> print(wrap_str_nicely(s))
(X (l ",.") (A (O "123") (P (:Text "4") (em "56"))) (em (m "!?"))
(B (Q (em "78") (:Text "9")) (R "abc")) (n "+-"))
>>> s = ('(X (s) (A (u) (C "One,")) (em (A (C " ") (D "two, ")))'
... '(B (E "three, ") (F "four!") (t))))')
>>> print(wrap_str_nicely(s))
(X (s) (A (u) (C "One,")) (em (A (C " ") (D "two, ")))(B (E "three, ")
(F "four!") (t))))
>>> s = ("[Node('word', 'This'), Node('word', 'is'), "
... "Node('word', 'Buckingham'), Node('word', 'Palace')]")
>>> print(wrap_str_nicely(s))
[Node('word', 'This'), Node('word', 'is'), Node('word', 'Buckingham'),
Node('word', 'Palace')]
>>> s = ("Node('phrase', (Node('word', 'Buckingham'), "
... "Node('blank', ' '), Node('word', 'Palace')))")
>>> print(wrap_str_nicely(s))
Node('phrase', (Node('word', 'Buckingham'), Node('blank', ' '),
Node('word', 'Palace')))
>>> s = ('<hard>Please mark up <foreign lang="de">Stadt\n<lb/></foreign>'
... '<location><foreign lang="de"><em>München</em></foreign> '
... 'in Bavaria</location> in this sentence.</hard>')
>>> print(wrap_str_nicely(s))
<hard>Please mark up <foreign lang="de">Stadt
<lb/></foreign><location><foreign lang="de"><em>München</em></foreign> in Bavaria</location> in this sentence.</hard>
>>> print(wrap_str_nicely(repr(s)[1:-1])) # repr to ignore the linebreaks
<hard>Please mark up <foreign lang="de">Stadt\n<lb/></foreign><location>
<foreign lang="de"><em>München</em></foreign> in Bavaria</location>
in this sentence.</hard>
"""
assert 2 <= tolerance
assert wrap_column > tolerance
if len(s) <= wrap_column or s.rfind('\n') >= 0: return s
parts = []
a = 0
i = wrap_column
while i < len(s):
for ch in wrap_chars:
m = i
while s[m] == ch: m -= 1
if i - m > tolerance // 2:
continue
k = s.rfind(ch, a, m)
probe = s[k + 1]
while k < i and s[k + 1] in wrap_chars and s[k + 1] != ' ': k += 1
if i - k <= tolerance:
parts.append(s[a:k + 1])
a = k + 1
i = k + 1 + wrap_column
break
else:
parts.append(s[a:i])
a = i
i += wrap_column
if a < len(s) - 1: parts.append(s[a:])
return '\n'.join(parts)
[docs]
def printw(s: Any, wrap_column: int = 79, tolerance: int = 24,
wrap_chars: str = ")]>, "):
"""Prints the string or other object nicely wrapped.
See :py:func:`wrap_str_nicely`."""
if isinstance(s, (list, tuple, dict)) and wrap_chars == ")]>, ":
wrap_chars = ",)]> "
s = repr(s)
elif not isinstance(s, str):
s = repr(s)
print(wrap_str_nicely(s, wrap_column, tolerance, wrap_chars))
RX_IDENTIFIER = re.compile(r'\w+')
RX_NON_IDENTIFIER = re.compile(r'\W+')
[docs]
@cython.locals(i=cython.int, delta=cython.int)
def as_identifier(s: str, replacement: str = "_") -> str:
r"""Converts a string to an identifier that matches /\w+/ by
substituting any character not matching /\w/ with the given
replacement string:
>>> as_identifier('EBNF-m')
'EBNF_m'
"""
ident = []
i = 0
while i < len(s):
m = RX_IDENTIFIER.match(s, i)
if m:
ident.append(m.group(0))
rng = m.span(0)
i += rng[1] - rng[0]
m = RX_NON_IDENTIFIER.match(s, i)
if m:
rng = m.span(0)
delta = rng[1] - rng[0]
ident.append(replacement * delta)
i += delta # rng[1] - rng[0]
return ''.join(ident)
NOPE = [] # a list that is empty and is supposed to remain empty
[docs]
@cython.locals(da=cython.int, db=cython.int, a=cython.int, b=cython.int)
def matching_brackets(text: str,
openB: str,
closeB: str,
unmatched: list = NOPE) -> List[Tuple[int, int]]:
"""Returns a list of matching bracket positions. Fills an empty list
passed to parameter `unmatched` with the positions of all
unmatched brackets.
>>> matching_brackets('(a(b)c)', '(', ')')
[(2, 4), (0, 6)]
>>> matching_brackets('(a)b(c)', '(', ')')
[(0, 2), (4, 6)]
>>> unmatched = []
>>> matching_brackets('ab(c', '(', ')', unmatched)
[]
>>> unmatched
[2]
"""
assert not unmatched, \
"Please pass an empty list as unmatched flag, not: " + str(unmatched)
stack, matches = [], []
da = len(openB)
db = len(closeB)
a = text.find(openB)
b = text.find(closeB)
while a >= 0 and b >= 0:
while 0 <= a < b:
stack.append(a)
a = text.find(openB, a + da)
while 0 <= b < a:
try:
matches.append((stack.pop(), b))
except IndexError:
if unmatched is not NOPE: unmatched.append(b)
b = text.find(closeB, b + db)
while b >= 0:
try:
matches.append((stack.pop(), b))
except IndexError:
if unmatched is not NOPE: unmatched.append(b)
b = text.find(closeB, b + db)
if unmatched is not NOPE:
if stack:
unmatched.extend(stack)
elif a >= 0:
unmatched.append(a)
return matches
# see definition of EntityRef in: XML-grammar: XML-grammar, see https://www.w3.org/TR/xml/
RX_ENTITY = re.compile(r'&(?:_|:|[A-Z]|[a-z]|[\u00C0-\u00D6]|[\u00D8-\u00F6]|[\u00F8-\u02FF]'
r'|[\u0370-\u037D]|[\u037F-\u1FFF]|[\u200C-\u200D]|[\u2070-\u218F]'
r'|[\u2C00-\u2FEF]|[\u3001-\uD7FF]|[\uF900-\uFDCF]|[\uFDF0-\uFFFD]'
r'|[\U00010000-\U000EFFFF])(?:_|:|-|\.|[A-Z]|[a-z]|[0-9]|\u00B7'
r'|[\u0300-\u036F]|[\u203F-\u2040]|[\u00C0-\u00D6]|[\u00D8-\u00F6]'
r'|[\u00F8-\u02FF]|[\u0370-\u037D]|[\u037F-\u1FFF]|[\u200C-\u200D]'
r'|[\u2070-\u218F]|[\u2C00-\u2FEF]|[\u3001-\uD7FF]|[\uF900-\uFDCF]'
r'|[\uFDF0-\uFFFD]|[\U00010000-\U000EFFFF])*;')
[docs]
def validate_XML_attribute_value(value: Any) -> str:
"""Validates an XML-attribute value and returns the quoted string-value
if successful. Otherwise, raises a ValueError.
"""
value = str(value)
contains_doublequote = value.find('"') >= 0
if contains_doublequote and value.find("'") >= 0:
raise ValueError(('Illegal XML-attribute value: %s (Cannot be quoted, because '
'both single and double quotation mark are contained in the '
'value. Use entities to avoid this conflict.)') % value)
if value.find('<') >= 0:
raise ValueError(f'XML-attribute value "{value}" must not contain "<"! Change '
f'config-variable "xml_attribute_error_handling" to "fix" to '
f'avoid this error.')
i = value.find('&')
while i >= 0:
if not RX_ENTITY.match(value, i):
raise ValueError('Ampersand.sign "&" not allowed in XML-attribute value '
'unless it is the beginning of an entity: ' + value)
i = value.find('&', i + 1)
return ("'%s'" % value) if contains_doublequote else '"%s"' % value
[docs]
def fix_XML_attribute_value(value: Any) -> str:
"""Returns the quoted XML-attribute value. In case the values
contains illegal characters, like '<', these will be replaced by
XML-entities."""
value = str(value)
value = value.replace('<', '<')
# value = value.replace('>', '>')
i = value.find('&')
while i >= 0:
if not RX_ENTITY.match(value, i):
value = value[:i] + '&' + value[i + 1:]
i = value.find('&', i + 1)
if value.find('"') >= 0:
if value.find("'") >= 0:
value = value.replace("'", "'")
value = "'%s'" % value
else:
value = '"%s"' % value
return value
RX_NON_ASCII = re.compile(r'[^\U00000000-\U000000FF]')
[docs]
def lxml_XML_attribute_value(value: Any) -> str:
"""Makes sure that the attribute value works with the lxml-library,
at the cost of replacing all characters with a code > 256 by
a quesiton mark.
:param value: the original attribute value
:return: the quoted and lxml-compatible attribute value.
"""
value = str(value)
value = RX_NON_ASCII.sub('?', value)
return fix_XML_attribute_value(value)
#######################################################################
#
# type system support
#
#######################################################################
[docs]
def issubtype(sub_type, base_type) -> bool:
"""Returns `True` if sub_type is a subtype of `base_type`.
WARNING: Implementation is somewhat "hackish" and might break
with new Python-versions.
"""
def origin(t) -> tuple:
def fix(t: Any) -> Any:
return {'Dict': dict, 'Tuple': tuple, 'List': list}.get(t, t)
try:
if isinstance(t, (str, bytes)): t = eval(t)
ot = t.__origin__
if ot is Union:
try:
return tuple((fix(a.__origin__) if a.__origin__ is not None else fix(a))
for a in t.__args__)
except AttributeError:
try:
return tuple((fix(a.__origin__) if a.__origin__ is not None else fix(a))
for a in t.__union_args__)
except AttributeError:
return t.__args__
except AttributeError:
if t == 'unicode': # work-around for cython bug
return str,
return fix(t),
return (fix(ot),) if ot is not None else (fix(t),)
true_st = origin(sub_type)
true_bt = origin(base_type)[0]
return any(issubclass(st, true_bt) for st in true_st)
[docs]
def isgenerictype(t):
"""Returns `True` if `t` is a generic type.
WARNING: This is very "hackish". Caller must make sure that `t`
actually is a type!"""
return str(t).endswith(']')
#######################################################################
#
# loading and compiling
#
#######################################################################
RX_FILEPATH = re.compile(r'[^ \t][^\n\t?*=]+(?<![ \t])') # r'[\w/:. \\]+'
[docs]
def load_if_file(text_or_file) -> str:
"""
Reads and returns content of a text-file if parameter
`text_or_file` is a file name (i.e. a single line string),
otherwise (i.e. if `text_or_file` is a multi-line string)
`text_or_file` is returned.
"""
if is_filename(text_or_file):
try:
with open(text_or_file, encoding="utf-8") as f:
content = f.read()
return content
except FileNotFoundError:
if RX_FILEPATH.fullmatch(text_or_file):
raise FileNotFoundError(
'File not found or not a valid filepath or URL: "%s".\n'
'(If "%s" was not meant to be a file name then, please, add '
'an empty line to distinguish source data from a file name.)'
% (text_or_file, text_or_file))
else:
return text_or_file
else:
return text_or_file
[docs]
def is_python_code(text_or_file: str) -> bool:
"""
Checks whether 'text_or_file' is python code or the name of a file that
contains python code.
"""
if is_filename(text_or_file):
return text_or_file[-3:].lower() == '.py'
try:
ast.parse(text_or_file)
return True
except (SyntaxError, ValueError, OverflowError):
pass
return False
def has_fenced_code(text_or_file: str, info_strings=('ebnf', 'test')) -> bool:
"""
Checks whether `text_or_file` contains fenced code blocks, which are
marked by one of the given info strings.
See https://spec.commonmark.org/0.28/#fenced-code-blocks for more
information on fenced code blocks in common mark documents.
"""
if is_filename(text_or_file):
with open(text_or_file, 'r', encoding='utf-8') as f:
markdown = f.read()
else:
markdown = text_or_file
if markdown.find('\n~~~') < 0 and markdown.find('\n```') < 0:
return False
if isinstance(info_strings, str):
info_strings = (info_strings,)
fence_tmpl = r'\n(?:(?:``[`]*[ ]*(?:%s)(?=[ .\-:\n])[^`\n]*\n)' + \
r'|(?:~~[~]*[ ]*(?:%s)(?=[ .\-:\n])[\n]*\n))'
label_re = '|'.join('(?:%s)' % matched_string for matched_string in info_strings)
rx_fence = re.compile(fence_tmpl % (label_re, label_re), flags=re.IGNORECASE)
for match in rx_fence.finditer(markdown):
matched_string = re.match(r'\n`+|\n~+', match.group(0)).group(0)
if markdown.find(matched_string, match.end()) >= 0:
return True
else:
break
return False
[docs]
def md5(*txt):
"""
Returns the md5-checksum for `txt`. This can be used to test if
some piece of text, for example a grammar source file, has changed.
"""
md5_hash = hashlib.md5()
for t in txt:
md5_hash.update(t.encode('utf8'))
return md5_hash.hexdigest()
[docs]
def compile_python_object(python_src: str, catch_obj="DSLGrammar") -> Any:
"""
Compiles the python source code and returns the (first) object
the name of which is either equal to or matched by ``catch_obj_regex``.
If catch_obj is the empty string, the namespace dictionary will be returned.
"""
code = compile(python_src, '<string>', 'exec')
namespace = {} # type: Dict[str, Any]
exec(code, namespace) # safety risk?
if catch_obj:
if isinstance(catch_obj, str):
try:
obj = namespace[catch_obj]
return obj
except KeyError:
catch_obj = re.compile(catch_obj)
matches = [key for key in namespace if catch_obj.fullmatch(key)]
if len(matches) < 1:
raise ValueError("No object matching /%s/ defined in source code." %
catch_obj.pattern)
elif len(matches) > 1:
raise ValueError("Ambiguous matches for %s : %s" %
(str(catch_obj), str(matches)))
return namespace[matches[0]] if matches else None
else:
return namespace
#######################################################################
#
# text services
#
#######################################################################
[docs]
@cython.locals(i=cython.int)
@functools.lru_cache()
def linebreaks(text: Union[StringView, str]) -> List[int]:
"""
Returns a list of indices all line breaks in the text.
"""
assert isinstance(text, (StringView, str)), \
"Type %s of `text` is not a string type!" % str(type(text))
lbr = [-1]
i = text.find('\n', 0)
while i >= 0:
lbr.append(i)
i = text.find('\n', i + 1)
lbr.append(len(text))
return lbr
[docs]
@cython.locals(line=cython.int, column=cython.int)
def line_col(lbreaks: List[int], pos: cython.int) -> Tuple[cython.int, cython.int]:
"""
Returns the position within a text as (line, column)-tuple based
on a list of all line breaks, including -1 and EOF.
"""
if not lbreaks and pos >= 0:
return 0, pos
if pos < 0 or pos > lbreaks[-1]: # one character behind EOF is still an allowed position!
raise ValueError('Position %i outside text of length %s !' % (pos, lbreaks[-1]))
line = bisect.bisect_left(lbreaks, pos)
column = pos - lbreaks[line - 1]
return line, column
[docs]
@cython.returns(cython.int)
@cython.locals(line=cython.int, column=cython.int, i=cython.int)
def text_pos(text: Union[StringView, str],
line: int, column: int,
lbreaks: List[int] = []) -> int:
"""
Returns the text-position for a given line and column or -1 if the line
and column exceed the size of the text.
"""
if lbreaks:
try:
return lbreaks[line] + column - 1
except IndexError:
return -1
else:
i = 0
while line > 0 and i >= 0:
i = text.find('\n', i + 1)
line -= 1
return i + column - 1
#######################################################################
#
# smart lists and multi-keyword tables
#
#######################################################################
# def smart_list(arg: Union[str, Iterable[T]]) -> Union[Sequence[str], Sequence[T]]:
[docs]
def smart_list(arg: Union[str, Iterable, Any]) -> Union[Sequence, Set]:
"""
Returns the argument as list, depending on its type and content.
If the argument is a string, it will be interpreted as a list of
comma separated values, trying ';', ',', ' ' as possible delimiters
in this order, e.g.
>>> smart_list('1; 2, 3; 4')
['1', '2, 3', '4']
>>> smart_list('2, 3')
['2', '3']
>>> smart_list('a b cd')
['a', 'b', 'cd']
If the argument is a collection other than a string, it will be
returned as is, e.g.
>>> smart_list((1, 2, 3))
(1, 2, 3)
>>> smart_list({1, 2, 3})
{1, 2, 3}
If the argument is another iterable than a collection, it will
be converted into a list, e.g.
>>> smart_list(i for i in {1,2,3})
[1, 2, 3]
Finally, if none of the above is true, the argument will be
wrapped in a list and returned, e.g.
>>> smart_list(125)
[125]
"""
if isinstance(arg, str):
for delimiter in (';', ','):
lst = arg.split(delimiter)
if len(lst) > 1:
return [s.strip() for s in lst]
return [s.strip() for s in arg.strip().split(' ')]
elif isinstance(arg, Sequence) or isinstance(arg, Set):
return arg
elif isinstance(arg, Iterable):
return list(arg)
else:
return [arg]
[docs]
def expand_table(compact_table: Dict) -> Dict:
"""
Expands a table by separating keywords that are tuples or strings
containing comma separated words into single keyword entries with
the same values. Returns the expanded table.
Example::
>>> expand_table({"a, b": 1, ('d','e','f'):5, "c":3})
{'a': 1, 'b': 1, 'd': 5, 'e': 5, 'f': 5, 'c': 3}
"""
expanded_table = {} # type: Dict
keys = list(compact_table.keys())
for key in keys:
value = compact_table[key]
for k in smart_list(key):
if k in expanded_table:
raise KeyError('Key "%s" used more than once in compact table!' % key)
expanded_table[k] = value
return expanded_table
#######################################################################
#
# JSON RPC support
#
#######################################################################
JSON_Type: TypeAlias = Union[Dict, Sequence, str, int, float, None]
JSON_Dict: TypeAlias = Dict[str, JSON_Type]
[docs]
class JSONstr:
"""
JSONStr is a special type that encapsulates already serialized
json-chunks in json object-trees. ``json_dumps`` will insert the content
of a JSONStr-object literally, rather than serializing it as other
objects.
"""
__slots__ = ['serialized_json']
def __repr__(self):
return self.serialized_json
def __init__(self, serialized_json: str):
assert isinstance(serialized_json, str)
self.serialized_json = serialized_json
[docs]
class JSONnull:
"""JSONnull is a special type that is serialized as ``null`` by ``json_dumps``.
This can be used whenever it is inconvenient to use ``None`` as the null-value.
"""
__slots__ = []
def __repr__(self):
return 'null'
# the following string-escaping tables and procedures have been
# copy-pasted and slightly adapted from the std-library
ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]')
ESCAPE_DCT = {'\\': '\\\\', '"': '\\"', '\x08': '\\b', '\x0c': '\\f', '\n': '\\n', '\r': '\\r',
'\t': '\\t', '\x00': '\\u0000', '\x01': '\\u0001', '\x02': '\\u0002',
'\x03': '\\u0003', '\x04': '\\u0004', '\x05': '\\u0005', '\x06': '\\u0006',
'\x07': '\\u0007', '\x0b': '\\u000b', '\x0e': '\\u000e', '\x0f': '\\u000f',
'\x10': '\\u0010', '\x11': '\\u0011', '\x12': '\\u0012', '\x13': '\\u0013',
'\x14': '\\u0014', '\x15': '\\u0015', '\x16': '\\u0016', '\x17': '\\u0017',
'\x18': '\\u0018', '\x19': '\\u0019', '\x1a': '\\u001a', '\x1b': '\\u001b',
'\x1c': '\\u001c', '\x1d': '\\u001d', '\x1e': '\\u001e', '\x1f': '\\u001f'}
def json_encode_string(s: str) -> str:
return '"' + ESCAPE.sub(lambda m: ESCAPE_DCT[m.group(0)], s) + '"'
[docs]
def json_dumps(obj: JSON_Type, *, cls=json.JSONEncoder, partially_serialized: bool = False) -> str:
"""Returns json-object as string. Other than the standard-library's
`json.dumps()`-function `json_dumps` allows to include alrady serialzed
parts (in the form of JSONStr-objects) in the json-object. Example::
>>> already_serialized = '{"width":640,"height":400"}'
>>> literal = JSONstr(already_serialized)
>>> json_obj = {"jsonrpc": "2.0", "method": "report_size", "params": literal, "id": None}
>>> json_dumps(json_obj, partially_serialized=True)
'{"jsonrpc":"2.0","method":"report_size","params":{"width":640,"height":400"},"id":null}'
:param obj: A json-object (or a tree of json-objects) to be serialized
:param cls: The class of a custom json-encoder berived from ``json.JSONEncoder``
:param partially_serialized: If True, :py:class:`JSONStr`-objects within the json tree
will be encoded (by inserting their content). If False, :py:class:`JSONStr`-objects
will raise a TypeError, but encoding will be faster.
:return: The string-serialized form of the json-object.
"""
# def serialize(obj) -> Iterator[str]:
# if isinstance(obj, str):
# yield json_encode_string(obj)
# elif isinstance(obj, dict):
# buf = '{'
# for k, v in obj.items():
# yield buf + '"' + k + '":'
# yield from serialize(v)
# buf = ','
# yield '}'
# elif isinstance(obj, (list, tuple)):
# buf = '['
# for item in obj:
# yield buf
# yield from serialize(item)
# buf = ','
# yield ']'
# elif obj is True:
# yield 'true'
# elif obj is False:
# yield 'false'
# elif isinstance(obj, int):
# # NOTE: test for int must follow test for bool, because True and False
# # are treated as instances of int as well by Python
# yield str(obj)
# elif isinstance(obj, float):
# yield str(obj)
# elif obj is None:
# yield 'null'
# elif isinstance(obj, JSONStr):
# yield obj.serialized_json
# else:
# yield from serialize(custom_encoder.default(obj))
def serialize(obj) -> List[str]:
if isinstance(obj, str):
return [json_encode_string(obj)]
elif isinstance(obj, dict):
if obj:
r = ['{']
for k, v in obj.items():
r.append('"' + k + '":')
r.extend(serialize(v))
r.append(',')
r[-1] = '}'
return r
return ['{}']
elif isinstance(obj, (list, tuple)):
if obj:
r = ['[']
for item in obj:
r.extend(serialize(item))
r.append(',')
r[-1] = ']'
return r
return ['[]']
elif obj is True:
return ['true']
elif obj is False:
return ['false']
elif obj is None:
return ['null']
elif isinstance(obj, (int, float)):
# NOTE: test for int must follow test for bool, because True and False
# are treated as instances of int as well by Python
return[repr(obj)]
elif isinstance(obj, JSONstr):
return [obj.serialized_json]
elif obj is JSONnull or isinstance(obj, JSONnull):
return ['null']
return serialize(custom_encoder.default(obj))
if partially_serialized:
custom_encoder = cls()
return ''.join(serialize(obj))
else:
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if obj is JSONnull or isinstance(obj, JSONnull):
return None
return cls.default(self, obj)
return json.dumps(obj, cls=MyEncoder, indent=None, separators=(',', ':'))
[docs]
def json_rpc(method: str,
params: JSON_Type = [],
ID: Optional[int] = None,
partially_serialized: bool = True) -> str:
"""Generates a JSON-RPC-call string for `method` with parameters `params`.
:param method: The name of the rpc-function that shall be called
:param params: A json-object representing the parameters of the call
:param ID: An ID for the json-rpc-call or `None`
:param partially_serialized: If True, the `params`-object may contain
already serialized parts in form of `JSONStr`-objects.
If False, any `JSONStr`-objects will lead to a TypeError.
:return: The string-serialized form of the json-object.
"""
rpc = {"jsonrpc": "2.0", "method": method, "params": params}
if ID is not None:
rpc['id'] = ID
return json_dumps(rpc, partially_serialized=partially_serialized)
[docs]
def pp_json(obj: JSON_Type, *, cls=json.JSONEncoder) -> str:
"""Returns json-object as pretty-printed string. Other than the standard-library's
`json.dumps()`-function `pp_json` allows to include already serialized
parts (in the form of JSONStr-objects) in the json-object. Example::
>>> already_serialized = '{"width":640,"height":400"}'
>>> literal = JSONstr(already_serialized)
>>> json_obj = {"jsonrpc": "2.0", "method": "report_size", "params": literal, "id": None}
>>> print(pp_json(json_obj))
{
"jsonrpc": "2.0",
"method": "report_size",
"params": {"width":640,"height":400"},
"id": null}
:param obj: A json-object (or a tree of json-objects) to be serialized
:param cls: The class of a custom json-encoder derived from `json.JSONEncoder`
:return: The pretty-printed string-serialized form of the json-object.
"""
custom_encoder = cls()
def serialize(obj, indent: str) -> List[str]:
if isinstance(obj, str):
if obj.find('\n') >= 0:
lines = obj.split('\n')
pretty_str = json_encode_string(lines[0]) + '\n' \
+ '\n'.join(indent + json_encode_string(line) for line in lines[1:])
return [pretty_str]
else:
return [json_encode_string(obj)]
elif isinstance(obj, dict):
if obj:
if len(obj) == 1:
k, v = next(iter(obj.items()))
if not isinstance(v, (dict, list, tuple)):
r = ['{"' + k + '": ']
r.extend(serialize(v, indent + ' '))
r.append('}')
return r
r = ['{\n' + indent + ' ']
for k, v in obj.items():
r.append('"' + k + '": ')
r.extend(serialize(v, indent + ' '))
r.append(',\n' + indent + ' ')
r[-1] = '}'
return r
return ['{}']
elif isinstance(obj, (list, tuple)):
if obj:
r = ['[']
for item in obj:
r.extend(serialize(item, indent + ' '))
r.append(',')
r[-1] = ']'
return r
return ['[]']
elif obj is True:
return ['true']
elif obj is False:
return ['false']
elif obj is None:
return ['null']
elif isinstance(obj, (int, float)):
# NOTE: test for int must follow test for bool, because True and False
# are treated as instances of int as well by Python
return [repr(obj)]
elif isinstance(obj, JSONstr):
return [obj.serialized_json]
elif isinstance(obj, JSONnull) or obj is JSONnull:
return ['null']
return serialize(custom_encoder.default(obj), indent)
return ''.join(serialize(obj, ''))
[docs]
def pp_json_str(jsons: str) -> str:
"""Pretty-prints and already serialized (but possibly ugly-printed)
json object in a well-readable form. Syntactic sugar for:
`pp_json(json.loads(jsons))`."""
return pp_json(json.loads(jsons))
#######################################################################
#
# concurrent execution (wrappers for concurrent.futures)
#
#######################################################################
[docs]
class SingleThreadExecutor(concurrent.futures.Executor):
r"""SingleThreadExecutor is a replacement for
concurrent.future.ProcessPoolExecutor and
concurrent.future.ThreadPoolExecutor that executes any submitted
task immediately in the submitting thread. This helps to avoid
writing extra code for the case that multithreading or
multiprocesssing has been turned off in the configuration. To do
so is helpful for debugging.
It is not recommended to use this in asynchronous code or code that
relies on the submit()- or map()-method of executors to return quickly.
"""
[docs]
def submit(self, fn, *args, **kwargs) -> concurrent.futures.Future:
"""Run function "fn" with the given args and kwargs synchronously
without multithreading or multiprocessing."""
future = concurrent.futures.Future()
try:
result = fn(*args, **kwargs)
future.set_result(result)
except BaseException as e:
future.set_exception(e)
return future
[docs]
@functools.lru_cache(None)
def multiprocessing_broken() -> str:
"""Returns an error message, if, for any reason multiprocessing is not safe
to be used. For example, multiprocessing does not work with
PyInstaller (under Windows) or GraalVM.
"""
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
err_msg = "Multiprocessing turned off for PyInstaller-bundled script."
print(err_msg)
return err_msg
return ""
[docs]
def instantiate_executor(allow_parallel: bool,
preferred_executor: Type[concurrent.futures.Executor],
*args, **kwargs) -> concurrent.futures.Executor:
"""Instantiates an Executor of a particular type, if the value of the
configuration variable 'debug_parallel_execution' allows to do so.
Otherwise, a surrogate executor will be returned.
If 'allow_parallel` is False, a SingleThreadExecutor will be instantiated,
regardless of the preferred_executor and any configuration values.
"""
if allow_parallel:
mode = get_config_value('debug_parallel_execution') # type: str
if mode == "commandline":
options = [arg for arg in sys.argv if arg[:2] == '--'] # type: List[str]
if '--singlethread' in options: mode = 'singlethread'
elif '--multithreading' in options: mode = 'multithreading'
else: mode = 'multiprocessing'
if mode == "singlethread":
return SingleThreadExecutor()
elif mode == "multithreading" or (mode == "multiprocessing" and multiprocessing_broken()):
if issubclass(preferred_executor, concurrent.futures.ProcessPoolExecutor):
return concurrent.futures.ThreadPoolExecutor(*args, **kwargs)
else:
assert mode == "multiprocessing", \
'Config variable "debug_parallel_execution" as illegal value "%s"' % mode
return preferred_executor(*args, **kwargs)
return SingleThreadExecutor()
[docs]
def cpu_count() -> int:
"""Returns the number of cpus that are accessible to the current process
or 1 if the cpu count cannot be determined."""
try:
return len(os.sched_getaffinity(0)) or 1
except AttributeError:
return os.cpu_count() or 1
#######################################################################
#
# initialization
#
#######################################################################
try:
if sys.stdout.encoding.upper() != "UTF-8": # and platform.system() == "Windows":
# make sure that `print()` does not raise an error on
# non-ASCII characters:
# sys.stdout = cast(io.TextIOWrapper, codecs.getwriter("utf-8")(cast(
# io.BytesIO, cast(io.TextIOWrapper, sys.stdout).detach())))
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), sys.stdout.encoding, 'replace')
except AttributeError:
# somebody has already taken care of this !?
pass