Source code for doitoml.dsl

"""Domain-specific language for declarative ``doit`` task generation."""

import abc
import fnmatch
import json
import os
import re
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Tuple, cast

from doitoml.constants import FNMATCH_WILDCARDS

from .errors import DslError
from .types import Strings

if TYPE_CHECKING:
    from .doitoml import DoiTOML
    from .sources._config import ConfigSource
    from .sources._source import Source


[docs] class DSL: """A base class for a ``doitoml`` DSL plugin.""" #: a reference to the parent doitoml: "DoiTOML" #: the rank of the DSL rank = 100 def __init__(self, doitoml: "DoiTOML") -> None: """Create a DSL and remember its parent.""" self.doitoml = doitoml @abc.abstractproperty def pattern(self) -> re.Pattern[str]: """Advertise the regular expression that will uniquely identify this DSL."""
[docs] @abc.abstractmethod def transform_token( self, source: "ConfigSource", match: re.Match[str], raw_token: str, **kwargs: Any, ) -> Strings: """Transform a token into one or more strings."""
[docs] class PathRef(DSL): """Look for previously-found paths.""" #: paths go before all other built-in DSL rank = 80 pattern = re.compile(r"^::((?P<prefix>[^:]*)::)?(?P<ref>[^:]+)$")
[docs] def transform_token( self, source: "ConfigSource", match: re.Match[str], raw_token: str, **kwargs: Any, ) -> Strings: """Expand a path name (with optional prefix) to a previously-found value.""" groups = match.groupdict() ref: str = groups["ref"] prefix = source.prefix if groups["prefix"] is None else groups["prefix"] config = self.doitoml.config if any(c in prefix for c in FNMATCH_WILDCARDS): prefixes = fnmatch.filter(sorted(config.sources), prefix) else: prefixes = [prefix] prefix_tokens: Dict[str, List[str]] = {} for prefix in prefixes: for named in [config.paths, config.tokens]: from_named = named.get((prefix, ref)) if from_named is not None: prefix_tokens[prefix] = from_named if prefix_tokens: return sum(prefix_tokens.values(), []) return None # type: ignore
[docs] class EnvReplacer(DSL): """A wrapper for UNIX-style variable expansion.""" pattern = re.compile(r"\$\{([^\}]+)\}") #: paths go before most other built-in DSL rank = 90 def _replacer(self, match: re.Match) -> str: """Fetch an environment variable from the parent object.""" return self.doitoml.get_env(match[1])
[docs] def transform_token( self, source: "ConfigSource", match: re.Match[str], raw_token: str, **kwargs: Any, ) -> Strings: """Replace all environment variable with their value in ``os.environ``.""" return [self.pattern.sub(self._replacer, raw_token)]
[docs] class Globber(DSL): """A wrapper for ``glob`` and ``rglob``.""" pattern = re.compile(r"^:(?P<kind>(r?glob))::(?P<rest>:{0,2}.*)$")
[docs] def transform_token( self, source: "ConfigSource", match: re.Match[str], raw_token: str, **kwargs: Any, ) -> Strings: """Expand a token to zero or more :class:`pathlib.Path` based on (r)glob(s). Chunks are delimited by ``::``. The first chunk is a relative path. Each following chunk between ``::`` may be a matcher or have a prefix. - ``!``: a :class:`re.Pattern` which will exclude all matched items - ``/s/``: expects two following chunks: - the first is a :class:`re.Pattern` to `find` - the next is the `replacement` string Order does not matter: all excludes an replacers will be applied `after` all matches are expanded. """ groups = match.groupdict() kind = cast(str, groups["kind"]) rest = cast(str, groups["rest"]) root, glob_rest = rest.split("::", 1) root_path = (source.path.parent / root).resolve() globs = glob_rest.split("::") globber = root_path.glob if kind == "glob" else root_path.rglob new_value: List[Path] = [] excludes: List[re.Pattern[str]] = [] replacers: List[Tuple[re.Pattern[str], str]] = [] while globs: glob = globs.pop(0) if glob.startswith("!"): excludes += [re.compile(glob[1:])] continue if glob.startswith("/s/"): replacer = globs.pop(0) repl_value = globs.pop(0) replacers += [(re.compile(replacer), repl_value)] continue new_value += [*globber(glob)] final_value = [] parent_posix = source.path.parent.as_posix() for path in new_value: as_posix = path.as_posix() if excludes: as_posix_rel = Path( os.path.relpath(str(as_posix), parent_posix), ).as_posix() if as_posix_rel and any(ex.search(as_posix_rel) for ex in excludes): continue for pattern, repl_value in replacers: as_posix = pattern.sub(repl_value, as_posix) final_value += [as_posix] return sorted(set(final_value))
[docs] class Getter(DSL): """A wrapper for known parsers.""" _pattern: re.Pattern[str] def __init__(self, doitoml: "DoiTOML") -> None: """Initialize and pre-calculate the pattern.""" super().__init__(doitoml) keys = sorted(self.doitoml.entry_points.parsers.keys()) self._pattern = re.compile( r"^:get(?P<default>\|[^:]*)?::(?P<parser>" + "|".join(keys) + r")::(?P<path>.+?)::(?P<rest>:{0,2}.*)$", ) @property def pattern(self) -> re.Pattern[str]: """Load the pre-calculated pattern.""" return self._pattern
[docs] def transform_token( self, source: "ConfigSource", match: re.Match[str], raw_token: str, **kwargs: Any, ) -> Strings: """Get a value from a parseable file, cast it to a string. All extra items are passed as positional arguments to the Source. """ try: new_source, bits = self.get_source_with_key(source, match, raw_token) except DslError as err: default = match.groupdict()["default"] if default: return [default[1:]] raise err new_value = new_source.get(bits) if isinstance(new_value, str): return [new_value] if isinstance(new_value, dict): return [json.dumps(new_value)] if isinstance(new_value, (int, float, bool)): return [json.dumps(new_value)] return [x if isinstance(x, str) else json.dumps(x) for x in new_value]
[docs] def get_source_with_key( self, source: "ConfigSource", match: re.Match[str], raw_token: str, ) -> Tuple["Source", List[str]]: """Find a raw source and its bits.""" groups = match.groupdict() path: str = groups["path"] groups["default"] bits: List[str] = groups["rest"].split("::") if len(bits) == 1 and not bits[0]: bits = [] # find the parser parser_name: str = groups["parser"] parser = self.doitoml.entry_points.parsers.get(parser_name) if parser is None: # pragma: no cover message = f"parser {parser} is not supported" raise DslError(message) get_path = (source.path.parent / path).resolve() if not get_path.exists(): message = f"{get_path} does not exist, can't get {bits}" raise DslError(message) new_source = parser(get_path) return new_source, bits