Source code for doitoml.doitoml

"""Opinionated, declarative ``doit`` tasks from TOML, JSON, YAML, and more."""
import logging
import os
import subprocess
import sys
from io import TextIOBase
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast

import doit.action
import doit.tools

from .config import Config
from .constants import DOIT_TASK, DOITOML_META, NAME
from .entry_points import EntryPoints
from .errors import DoitomlError, EnvVarError, TaskError
from .types import (
    Action,
    ExecutionContext,
    GroupedTasks,
    PathOrStrings,
    PrefixedTasks,
    Task,
    TaskFunction,
    TaskGenerator,
)
from .utils.path import ensure_parents

MaybeLogLevel = Optional[Union[str, int]]


[docs] class DoiTOML: """An opinionated pyproject.toml-based doit task generator.""" config: Config log: logging.Logger entry_points: EntryPoints cwd: Path def __init__( self, config_paths: Optional[PathOrStrings] = None, *, cwd: Optional[Path] = None, update_env: Optional[bool] = None, fail_quietly: Optional[bool] = None, log: Optional[logging.Logger] = None, log_level: MaybeLogLevel = None, discover_config_paths: Optional[bool] = None, validate: Optional[bool] = None, safe_paths: Optional[List[str]] = None, ) -> None: """Initialize a ``doitoml`` task generator.""" self.cwd = Path(cwd) if cwd else Path.cwd() try: self.log = self.init_log(log, log_level) self.entry_points = EntryPoints(self) self.config = self.init_config( config_paths or [], update_env=update_env, fail_quietly=fail_quietly, discover_config_paths=discover_config_paths, validate=validate, safe_paths=safe_paths, ) # initialize late for ``entry_points`` that reference ``self.entry_points`` self.entry_points.initialize() self.config.initialize() except DoitomlError as err: if fail_quietly or ( fail_quietly is None and self.config and self.config.fail_quietly ): self.log.error("%s: %s", type(err).__name__, err) sys.exit(1) else: raise err if self.config.update_env: self.update_env()
[docs] def init_log( self, log: Optional[logging.Logger] = None, log_level: MaybeLogLevel = None, ) -> logging.Logger: """Initialize logging.""" log = log or logging.getLogger() if log_level: log.setLevel(log_level) return log
[docs] def init_config( self, config_paths: PathOrStrings, update_env: Optional[bool] = None, fail_quietly: Optional[bool] = None, discover_config_paths: Optional[bool] = None, validate: Optional[bool] = None, safe_paths: Optional[List[str]] = None, ) -> Config: """Initialize configuration.""" return Config( self, [(self.cwd / path).resolve() for path in config_paths], update_env=update_env, fail_quietly=fail_quietly, discover_config_paths=discover_config_paths, validate=validate, safe_paths=safe_paths, )
[docs] def tasks(self) -> Dict[str, TaskFunction]: """Generate functions compatible with the default ``doit`` loader style.""" tasks = {} task_groups = self.group_tasks(self.config.tasks) for task_name, subtasks in task_groups.items(): if not task_name: subgroup = self.group_tasks(subtasks) for subtask_name, sub2_tasks in subgroup.items(): task = self.build_task_group(subtask_name, sub2_tasks) tasks[task.__name__] = task else: task = self.build_task_group(task_name, subtasks) tasks[task.__name__] = task return tasks
[docs] def group_tasks(self, tasks: PrefixedTasks) -> GroupedTasks: """Group tasks by their first prefix.""" groups: GroupedTasks = {} for prefixes, task in tasks.items(): groups.setdefault(prefixes[0], {}).update({prefixes[1:]: task}) return groups
[docs] def get_env(self, key: str, default: Optional[str] = None) -> str: """Get an environment variable from the real (or in-progress) environment.""" value = os.environ.get(key, self.config.env.get(key, default)) if value is None: message = f"{key} was not found in any environment, no default given" raise EnvVarError(message) return value
[docs] def update_env(self) -> None: """Update environment variables.""" os.environ.update(self.config.env)
[docs] def build_task_group( self, prefix: str, subtasks: PrefixedTasks, ) -> TaskFunction: """Build a nested ``doit`` task group.""" def task() -> TaskGenerator: for subtask_name, subtask in subtasks.items(): if DOIT_TASK.ACTIONS in subtask: yield self.build_subtask(subtask_name, subtask) else: # pragma: no cover message = "Expected a task in {subtask_name} {subtask}" raise TaskError(message) task.__name__ = f"task_{prefix}" task.__doc__ = f"... {len(subtasks)} {prefix} tasks" return task
[docs] def build_subtask(self, task_name: Tuple[str, ...], raw_task: Task) -> Task: """Build a single generated ``doit`` task.""" name = ":".join(map(str, task_name)) task: Task = {"name": name} task.update(raw_task) meta = cast(dict, task.get(DOIT_TASK.META, {})) dt_meta = meta.get(NAME, {}) cwd = dt_meta.get(DOITOML_META.CWD) or self.cwd env = dt_meta.get(DOITOML_META.ENV, {}) log_paths = dt_meta.get(DOITOML_META.LOG) cmd_env = dict(os.environ) cmd_env.update(env) execution_context = ExecutionContext( cwd=cwd, log_paths=log_paths, env=cmd_env, log_mode="w", ) task[DOIT_TASK.ACTIONS] = self.build_subtask_actions(task, execution_context) task[DOIT_TASK.UPTODATE] = self.build_subtask_uptodates(task, execution_context) return cast(Task, task)
[docs] def build_subtask_actions( self, task: Task, execution_context: ExecutionContext, ) -> List[Action]: """Build all actions in a subtask.""" old_actions = task[DOIT_TASK.ACTIONS] new_actions: List[Any] = [(doit.tools.create_folder, [execution_context.cwd])] for idx, action in enumerate(old_actions): sub_execution_context = ExecutionContext( cwd=execution_context.cwd, env=execution_context.env, log_paths=execution_context.log_paths, log_mode="a" if idx else "w", ) action_actions = self.build_one_action(action, sub_execution_context) if action_actions is None: message = f"""{task["name"]} action {idx} is not a recognized action {action} """ raise TaskError(message) new_actions += action_actions return new_actions
[docs] def build_subtask_uptodates( self, task: Task, execution_context: ExecutionContext, ) -> List[Any]: """Expand custom updaters into actual functions.""" new_uptodates: List[Any] = [] for idx, uptodate in enumerate(task.get(DOIT_TASK.UPTODATE, [])): sub_execution_context = ExecutionContext( cwd=execution_context.cwd, env=execution_context.env, log_paths=execution_context.log_paths, log_mode="a" if idx else "w", ) new_uptodate: Any = None if not isinstance(uptodate, dict): new_uptodate = uptodate else: key, value = list(uptodate.items())[0] updater = self.entry_points.updaters[key] new_uptodate = updater.get_update_function(value, sub_execution_context) new_uptodates += [new_uptodate] return new_uptodates
[docs] def build_one_action( self, action: Action, execution_context: ExecutionContext, ) -> Optional[List[Action]]: """Build up a single action definition.""" is_shell = isinstance(action, str) is_tokens = isinstance(action, list) and all( isinstance(t, (str, Path)) for t in action ) if isinstance(action, dict): for actor in self.entry_points.actors.values(): if actor.knows(cast(dict, action)): return actor.perform_action(action, execution_context) if isinstance(action, (str, list)) and (is_shell or is_tokens): popen_kwargs = {"cwd": execution_context.cwd, "env": execution_context.env} if not any(execution_context.log_paths): return [doit.tools.CmdAction(action, **popen_kwargs, shell=is_shell)] args = [action] if isinstance(action, str) else list(map(str, action)) return [ ( self.logged_action, [args, popen_kwargs, execution_context], ), ] return None
[docs] def logged_action( self, args: List[str], popen_kwargs: Dict[str, Any], execution_context: ExecutionContext, ) -> bool: """Run a process, capturing the output to files.""" stdout, stderr = ensure_parents(*execution_context.log_paths) out = stdout.open(execution_context.log_mode) if stdout else None err = None if stderr: err = ( subprocess.STDOUT if stdout == stderr else stderr.open(execution_context.log_mode) ) streams: Dict[str, Any] = {"stdout": out, "stderr": err} rc = subprocess.call(args, **streams, **popen_kwargs) # noqa: S603 for stream in streams.values(): if isinstance(stream, TextIOBase): stream.close() return rc == 0