"""Handles discovering, loading, and normalizing configuration."""
import json
import os
import warnings
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Tuple,
Type,
Union,
cast,
)
from .constants import (
DEFAULTS,
DOIT_TASK,
DOITOML_META,
NAME,
)
from .errors import (
ActionError,
ConfigError,
DoitomlError,
MissingDependencyError,
NoActorError,
NoConfigError,
NoTemplaterError,
PrefixError,
SkipError,
TemplaterError,
UnresolvedError,
UnsafePathError,
)
from .schema._gen._doit_v0_schema import DoitomlSchema
from .sources._config import ConfigParser, ConfigSource
from .sources._source import Source
from .types import (
Action,
LogPaths,
PathOrString,
PathOrStrings,
Paths,
PrefixedStrings,
PrefixedTaskGenerator,
PrefixedTasks,
PrefixedTemplates,
Strings,
Task,
)
from .utils.json import to_json
from .utils.path import normalize_path
if TYPE_CHECKING:
from .doitoml import DoiTOML
Parsers = Dict[str, Type[Source]]
ConfigParsers = Dict[Tuple[str], Type[ConfigSource]]
ConfigSources = Dict[str, ConfigSource]
EnvDict = Dict[str, str]
RETRIES = 11
[docs]
class Config:
"""A composite configuration loaded from multiple ConfigSource."""
doitoml: "DoiTOML"
config_paths: Paths
sources: ConfigSources
tasks: PrefixedTasks
env: EnvDict
paths: PrefixedStrings
templates: PrefixedTemplates
tokens: PrefixedStrings
update_env: Optional[bool]
fail_quietly: Optional[bool]
discover_config_paths: Optional[bool]
validate: Optional[bool]
safe_paths: List[str]
def __init__( # noqa: PLR0913
self,
doitoml: "DoiTOML",
config_paths: Paths,
*,
update_env: Optional[bool] = None,
fail_quietly: Optional[bool] = None,
discover_config_paths: Optional[bool] = None,
validate: Optional[bool] = None,
safe_paths: Optional[List[str]] = None,
) -> None:
"""Create empty configuration and discover sources."""
self.validate = validate
self.doitoml = doitoml
self.config_paths = config_paths
self.sources = {}
self.tasks = {}
self.paths = {}
self.env = {}
self.tokens = {}
self.templates = {}
self.update_env = update_env
self.fail_quietly = fail_quietly
self.discover_config_paths = discover_config_paths
self.safe_paths = [normalize_path(p) for p in safe_paths or []]
[docs]
def to_dict(self) -> DoitomlSchema:
"""Return a normalized subset of config data."""
env = dict(**self.env)
env.update(os.environ)
as_dict: DoitomlSchema = to_json(
{
"env": env,
"tokens": {":".join(k): v for k, v in self.tokens.items()},
"paths": {":".join(k): v for k, v in self.paths.items()},
"templates": self.templates,
"tasks": {":".join(map(str, k)): v for k, v in self.tasks.items()},
},
)
return as_dict
[docs]
def initialize(self) -> None:
"""Perform a few passes to configure everything."""
self.sources = self.find_config_sources()
# load top-level config values from the first config
top_config = [*self.sources.values()][0]
for key in DEFAULTS.ALL_FROM_FIRST_CONFIG:
if getattr(self, key, None) is None:
setattr(self, key, top_config.raw_config.get(key, True))
unresolved_env: EnvDict = {}
unresolved_paths: PrefixedStrings = {}
unresolved_tokens: PrefixedStrings = {}
retry = RETRIES
# .. allow some retries for cross (but not circular) references
while retry:
retry -= 1
try:
unresolved_env = self.init_env(unresolved_env, RETRIES)
unresolved_paths = self.init_paths(unresolved_paths, RETRIES)
unresolved_tokens = self.init_tokens(unresolved_tokens, RETRIES)
except UnresolvedError: # pragma: no cover
pass
if not any([unresolved_env, unresolved_paths, unresolved_tokens]):
break
if not retry:
message = [f"Gave up after {RETRIES} retries!"]
if unresolved_env:
message += [
f"Failed to resolve environment variables: "
f"{pformat(unresolved_env)}",
]
if unresolved_paths:
message += [f"Failed to resolve paths: {pformat(unresolved_paths)}"]
if unresolved_tokens:
message += [
f"Failed to resolve tokens: {pformat(unresolved_tokens)}",
]
raise UnresolvedError("\n".join(message))
# ... then templates
self.init_templates()
# ... then find the tasks
self.init_tasks()
self.maybe_validate()
[docs]
def maybe_validate(self) -> None:
"""Validate if requested, or."""
if self.validate is False:
return None
jsonschema_error = None
try:
from .schema.validator import latest
return latest.validate(self.to_dict())
except MissingDependencyError as err:
jsonschema_error = str(err)
if jsonschema_error: # pragma: no cover
message = (
f"Validation was requested, but cannot validate: {jsonschema_error}"
)
warnings.warn(message, stacklevel=1)
return None # pragma: no cover
[docs]
def find_config_sources(self) -> ConfigSources:
"""Find all directly and referenced configuration sources."""
config_sources: ConfigSources = {}
unchecked_paths = [*self.config_paths]
if not unchecked_paths:
unchecked_paths += self.find_fallback_config_sources()
if not unchecked_paths:
path = self.doitoml.cwd / DEFAULTS.CONFIG_PATH
if path.exists():
unchecked_paths += [path]
if unchecked_paths and not self.safe_paths:
self.safe_paths = [normalize_path(unchecked_paths[0].parent)]
while unchecked_paths:
config_path = unchecked_paths.pop(0)
config_source = self.load_config_source(
Path(self.check_safe_path(config_path)),
)
self.find_one_config_source(config_source, config_sources)
if not config_sources:
message = "No ``doitoml`` config found"
raise NoConfigError(message)
return config_sources
[docs]
def find_one_config_source(
self,
config_source: ConfigSource,
sources: ConfigSources,
) -> None:
"""Discover a config source and its potentially-nested extra sources."""
if config_source in sources.values() or not config_source.raw_config:
return
self.claim_prefix(config_source, sources)
for extra_source in config_source.extra_config_sources(self.doitoml):
self.find_one_config_source(extra_source, sources)
[docs]
def find_fallback_config_sources(self) -> Paths:
"""Find sources."""
unchecked = []
if self.discover_config_paths is not False:
for parser in self.doitoml.entry_points.config_parsers.values():
for well_known in parser.well_known:
path = self.doitoml.cwd / well_known
if path.exists():
unchecked += [path]
return unchecked
[docs]
def claim_prefix(self, source: ConfigSource, sources: ConfigSources) -> None:
"""Claim a prefix for a source."""
prefix = source.prefix
prefix_claimed_by = sources.get(prefix)
if prefix_claimed_by:
message = f"{source} cannot claim prefix '{prefix}': {prefix_claimed_by}"
raise PrefixError(message)
sources[prefix] = source
[docs]
def get_config_parser(self, config_path: Path) -> ConfigParser:
"""Find the config parser for a path."""
config_parsers = self.doitoml.entry_points.config_parsers
tried = []
for config_parser in config_parsers.values():
if config_parser.pattern.search(config_path.name):
return config_parser
tried += [config_parser.pattern.pattern]
message = f"Cannot load {config_path}: expected one of {tried}"
raise ConfigError(message)
[docs]
def load_config_source(self, config_path: Path) -> ConfigSource:
"""Maybe load a configuration source."""
config_parser = self.get_config_parser(config_path)
return config_parser(config_path)
[docs]
def init_env(self, unresolved_env: EnvDict, retries: int) -> EnvDict:
"""Initialize the global environment variable."""
for source in self.sources.values():
self.init_source_env(source, unresolved_env)
while unresolved_env and retries:
retries -= 1
unresolved_env = self.init_env(unresolved_env, 0)
return unresolved_env
[docs]
def init_source_env(self, source: ConfigSource, unresolved_env: EnvDict) -> None:
"""Initialize the env declared in a single source."""
env = self.env
raw_config = source.raw_config
for env_key, env_value in raw_config.get("env", {}).items():
if env_key in env:
self.doitoml.log.info(
"$%s already set to `%s`: not setting with `%s` from %s",
env_key,
env[env_key],
env_value,
source,
)
continue
new_key_value = self.resolve_one_env(source, env_value)
if new_key_value is None:
unresolved_env[env_key] = env_value
else:
env[env_key] = new_key_value
unresolved_env.pop(env_key, None)
[docs]
def resolve_one_env(self, source: ConfigSource, env_value: Any) -> Optional[str]:
"""Resolve a single env member."""
new_value = str(env_value)
for dsl in self.doitoml.entry_points.dsl.values():
match = dsl.pattern.search(new_value)
if match is not None:
try:
resolved = dsl.transform_token(
source,
match,
new_value or env_value,
)
if resolved is None: # pragma: no cover
return None
return str(resolved[0])
except DoitomlError:
return None
return new_value
[docs]
def init_paths(
self,
unresolved_paths: PrefixedStrings,
retries: int,
) -> PrefixedStrings:
"""Find all paths in all sources."""
for source in self.sources.values():
self.init_source_paths(source, unresolved_paths)
while unresolved_paths and retries:
retries -= 1
unresolved_paths = self.init_paths(
unresolved_paths,
0,
)
return unresolved_paths
[docs]
def check_safe_path(self, path: PathOrString) -> str:
"""Check if some paths are safe."""
norm_path = normalize_path(path)
if any(norm_path.startswith(safe) for safe in self.safe_paths):
return str(path)
nl = "\n - "
message = (
f"The path is outside the known `safe_paths`: {norm_path}"
"\n"
f"""{nl}{nl.join(self.safe_paths)}"""
)
raise UnsafePathError(message)
[docs]
def init_tokens(
self,
unresolved_tokens: PrefixedStrings,
retries: int,
) -> PrefixedStrings:
"""Find all commands in all sources."""
for source in self.sources.values():
self.init_source_tokens(source, unresolved_tokens)
while unresolved_tokens and retries:
retries -= 1
unresolved_tokens = self.init_tokens(
unresolved_tokens,
0,
)
return unresolved_tokens
[docs]
def init_source_paths(
self,
source: ConfigSource,
unresolved_paths: PrefixedStrings,
) -> None:
"""Find the prefixed paths declared in a single source."""
raw_config = source.raw_config
path_key: str
for path_key, path_specs in raw_config.get("paths", {}).items():
found_paths, unresolved_specs = self.resolve_some_path_specs(
source,
path_specs,
source_relative=True,
)
if unresolved_specs:
unresolved_paths[source.prefix, path_key] = unresolved_specs
continue
self.paths[source.prefix, path_key] = sorted(
{normalize_path(p) for p in found_paths},
)
unresolved_paths.pop((source.prefix, path_key), None)
[docs]
def init_source_tokens(
self,
source: ConfigSource,
unresolved_tokens: PrefixedStrings,
) -> None:
"""Find the prefixed paths declared in a single source."""
raw_config = source.raw_config
path_key: str
for path_key, path_specs in raw_config.get(DEFAULTS.TOKENS, {}).items():
found_tokens, unresolved_specs = self.resolve_some_path_specs(
source,
path_specs,
source_relative=False,
)
if unresolved_specs:
unresolved_tokens[source.prefix, path_key] = unresolved_specs
continue
self.tokens[source.prefix, path_key] = found_tokens
unresolved_tokens.pop((source.prefix, path_key), None)
[docs]
def resolve_one_path_spec(
self,
source: ConfigSource,
spec: str,
source_relative: bool,
cwd: Optional[Path] = None,
) -> Optional[Strings]:
"""Resolve a single path in a task or one of the special field tables.
If ``source_relative``, transform unparsed strings into a path.
"""
cwd = cwd or source.path.parent
resolved = []
for dsl in self.doitoml.entry_points.dsl.values():
match = dsl.pattern.search(spec)
if match is not None:
resolved = dsl.transform_token(source, match, spec)
if resolved is None:
return None
break
if resolved:
if source_relative:
return [self.check_safe_path((cwd / r).resolve()) for r in resolved]
return resolved
if source_relative:
return [self.check_safe_path((cwd / spec).resolve())]
return [spec]
[docs]
def init_templates(self) -> None:
"""Copy templates (for now)."""
for prefix, source in self.sources.items():
raw_templates = source.raw_config.get("templates", {})
if raw_templates:
self.templates[prefix] = raw_templates
[docs]
def init_tasks(self) -> None:
"""Initialize all intermediate task representations."""
for prefix, source in self.sources.items():
raw_tasks = deepcopy(source.raw_config.get("tasks", {}))
if prefix in self.templates:
raw_templates = self.templates[prefix]
templaters = self.doitoml.entry_points.templaters
for templater_name, templater_kinds in raw_templates.items():
templater = templaters.get(templater_name)
if templater is None:
message = (
f"Templater {templater_name} not one of "
f"""{", ".join(templaters.keys())}"""
)
raise NoTemplaterError(message)
templater_tasks = deepcopy(templater_kinds.get("tasks", {}))
if not isinstance(templater_tasks, dict):
message = (
f"Expected dictionary of tasks in {source}, found: "
f"{templater_tasks}"
)
raise TemplaterError(message)
for task_name, task in templater_tasks.items():
templated = templater.transform_task(source, task)
if isinstance(templated, dict):
raw_tasks[task_name] = templated
else:
raw_tasks[task_name] = {t["name"]: t for t in templated}
for task_prefix, task in self.resolve_one_task_or_group(
source,
(prefix,),
raw_tasks,
):
claimed_prefix = self.tasks.get(task_prefix)
if claimed_prefix: # pragma: no cover
# not sure how we'd get here
pfx = ":".join(task_prefix)
message = f"""{source} cannot claim {pfx}: {claimed_prefix}"""
raise ConfigError(message)
self.tasks[task_prefix] = task
[docs]
def resolve_one_task_or_group(
self,
source: ConfigSource,
prefixes: Tuple[str, ...],
task_or_group: Union[Task, Dict[str, Task]],
) -> PrefixedTaskGenerator:
"""Resolve a task or task group in a prefix."""
if not isinstance(task_or_group, dict):
message = f"{source} task {prefixes} is not a dict: {task_or_group}"
raise ConfigError(message)
maybe_old_actions: Optional[List[Action]] = task_or_group.get(
DOIT_TASK.ACTIONS,
) # type: ignore
meta = cast(dict, task_or_group.get(DOIT_TASK.META))
if isinstance(meta, dict) and NAME in meta:
dt_meta = meta[NAME]
if isinstance(dt_meta, dict) and DOITOML_META.SKIP in dt_meta:
skip = dt_meta.get(DOITOML_META.SKIP, "0")
if self.resolve_one_skip(source, skip):
return
if maybe_old_actions:
task = cast(Task, task_or_group)
yield from self.resolve_one_task(source, prefixes, task)
return
group = cast(Dict[str, Task], task_or_group)
for subtask_prefix, subtask_or_group in group.items():
yield from self.resolve_one_task_or_group(
source,
(*prefixes, subtask_prefix),
subtask_or_group,
)
[docs]
def resolve_one_skip(self, source: ConfigSource, skip: Any) -> bool:
"""Maybe skip discovery of task (and all its children)."""
if skip is None:
return False
if isinstance(skip, (int, bool, float)):
return bool(skip)
if isinstance(skip, str):
found = self.resolve_one_path_spec(source, skip, source_relative=False)
if not found: # pragma: no cover
return False
json_val = json.loads(str(found[0]).lower().strip())
return bool(json_val)
if isinstance(skip, dict):
for key, skipper in self.doitoml.entry_points.skippers.items():
if key in skip:
return skipper.should_skip(source, skip[key])
message = f"Skip in {source} is ambiguous: {skip}"
raise SkipError(message)
[docs]
def resolve_one_task(
self,
source: ConfigSource,
prefixes: Tuple[str, ...],
task: Task,
) -> PrefixedTaskGenerator:
"""Resolve a single simple task."""
new_task = self.normalize_task_meta(source, task)
unresolved: List[Any] = []
unresolved += self.resolve_task_actions(source, new_task)
unresolved += self.resolve_task_uptodate(source, new_task)
for field in DOIT_TASK.RELATIVE_LISTS:
unresolved += self.resolve_one_task_list_field(
source,
new_task,
field,
)
if unresolved:
message = f"{source} task {prefixes} had unresolved paths: {unresolved}"
raise UnresolvedError(message)
yield prefixes, new_task
[docs]
def resolve_task_actions(self, source: ConfigSource, task: Task) -> List[str]:
"""Get the new actions (and unresolved paths) for a task."""
new_actions: List[Action] = []
for action in task.get(DOIT_TASK.ACTIONS, []):
action_actions, unresolved_specs = self.resolve_one_action(source, action)
if unresolved_specs:
return unresolved_specs
new_actions += action_actions
task[DOIT_TASK.ACTIONS] = new_actions
return []
[docs]
def resolve_task_uptodate(self, source: ConfigSource, task: Task) -> List[str]:
"""Transform some uptodate values."""
new_uptodate: List[Any] = []
for old_uptodate in task.get(DOIT_TASK.UPTODATE, []):
if isinstance(old_uptodate, dict):
uptodate, unresolved = self.resolve_one_uptodate(source, old_uptodate)
if unresolved:
return unresolved
new_uptodate += [uptodate]
else:
new_uptodate += [old_uptodate]
if new_uptodate:
task[DOIT_TASK.UPTODATE] = new_uptodate
return []
[docs]
def resolve_one_uptodate(
self,
source: ConfigSource,
uptodate: Dict[str, Any],
) -> Tuple[Dict[str, Any], Strings]:
"""Transform a single uptodate."""
for key, updater in self.doitoml.entry_points.updaters.items():
args = uptodate.get(key)
if args:
return {key: updater.transform_uptodate(source, args)}, []
return {}, [*uptodate.keys()]
[docs]
def build_log_paths(
self,
source: ConfigSource,
log: Optional[Union[str, List[str]]],
cwd: Path,
) -> LogPaths:
"""Set up proper path behavior from log metadata."""
if not log:
return (None, None)
if isinstance(log, str) and log:
log = [log, log]
maybe_paths: List[Optional[Path]] = [None, None]
for i, stream in enumerate(log):
if not stream:
continue
maybe_stream_path = self.resolve_one_path_spec(
source,
str(stream),
source_relative=True,
cwd=cwd,
)
if not maybe_stream_path: # pragma: no cover
message = f"Unresolved log path {stream}"
raise ConfigError(message)
maybe_paths[i] = Path(self.check_safe_path(maybe_stream_path[0]))
return self.check_log_paths(*maybe_paths)
[docs]
def check_log_paths(self, stderr_path: Any, stdout_path: Any) -> LogPaths:
"""Verify log paths."""
for path in [stderr_path, stdout_path]:
if path and path.is_dir():
message = f"A log path was a directory: {path}"
raise ConfigError(message)
if stderr_path is None and stdout_path is None:
message = "Expected log to be a string, or list of strings"
raise ConfigError(message)
return cast(LogPaths, (stderr_path, stdout_path))
[docs]
def resolve_one_action(
self,
source: ConfigSource,
action: Any,
) -> Tuple[List[Any], Strings]:
"""Expand the members of a single action."""
if isinstance(action, str):
# leave raw strings to doit
return [action], []
if isinstance(action, list):
return self.resolve_one_token_action(source, action)
if isinstance(action, dict):
return self.resolve_one_dict_action(source, action)
message = f"Unexpected {action} in {source}"
raise ActionError(message)
[docs]
def resolve_one_token_action(
self,
source: ConfigSource,
action: Any,
) -> Tuple[List[Any], Strings]:
"""Resolve an action made of shell tokens."""
unresolved_specs: Strings = []
new_tokens: PathOrStrings = []
for token in action:
token_tokens = self.resolve_one_path_spec(
source,
str(token),
source_relative=False,
)
if token_tokens:
new_tokens += token_tokens
else:
unresolved_specs += [token]
return [new_tokens], unresolved_specs
[docs]
def resolve_one_dict_action(
self,
source: ConfigSource,
action: Any,
) -> Tuple[List[Any], Strings]:
"""Resolve an action by an actor."""
tried = []
for actor_name, actor in self.doitoml.entry_points.actors.items():
if actor.knows(action):
actor_action = actor.transform_action(source, action)
return actor_action, []
tried += [actor_name]
message = f"No actor knew how to perform {action}, tried: {tried}"
raise NoActorError(message)
[docs]
def resolve_one_task_list_field(
self,
source: ConfigSource,
task: Task,
field: str,
) -> Strings:
"""Expand the members of a single field."""
specs: Strings = task.get(field, []) # type: ignore
new_paths, unresolved_specs = self.resolve_some_path_specs(
source,
specs,
source_relative=True,
)
if unresolved_specs:
return unresolved_specs
task[field] = new_paths # type: ignore
return []
[docs]
def resolve_some_path_specs(
self,
source: ConfigSource,
specs: List[str],
source_relative: bool,
) -> Tuple[List[Any], Strings]:
"""Resolve a list of path specs."""
new_paths: PathOrStrings = []
unresolved_specs = []
for spec in specs:
spec_paths = self.resolve_one_path_spec(
source,
spec,
source_relative=source_relative,
)
if not spec_paths:
unresolved_specs += [spec]
continue
new_paths += spec_paths
return new_paths, unresolved_specs