Source code for polyvers.cmdlet.cmdlets

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2015-2018 European Commission (JRC);
# Licensed under the EUPL 1.2+ (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
#
"""
Utils for building elaborate Commands/Sub-commands with traitlets [1]_ Application.

## Examples:

To run a base command, use this code::

    cd = MainCmd.make_cmd(argv, **app_init_kwds)  ## `sys.argv` used if `argv` is `None`!
    cmd.start()

To run nested commands and print its output, use :func:`baseapp.chain_cmds()` like that::

    cmd = chain_cmds([MainCmd, Sub1Cmd, Sub2Cmd], argv)  ## `argv` without sub-cmds
    sys.exit(baseapp.pump_cmd(cmd.start()) and 0)

Of course you can mix'n match.

## Configuration and Initialization guidelines for *Spec* and *Cmd* classes

0. The configuration of :class:`HasTraits` instance gets stored in its ``config`` attribute.
1. A :class:`HasTraits` instance receives its configuration from 3 sources, in this order:

  a. code specifying class-properties or running on constructors;
  b. configuration files (*json* or ``.py`` files);
  c. command-line arguments.

2. Constructors must allow for properties to be overwritten on construction; any class-defaults
   must function as defaults for any constructor ``**kwds``.

3. Some utility code depends on trait-defaults (i.e. construction of help-messages),
   so for certain properties (e.g. description), it is preferable to set them
   as traits-with-defaults on class-properties.

4. Listen `Good Bait <https://www.youtube.com/watch?v=CE4bl5rk5OQ>`_ after 1:43.

.. [1] http://traitlets.readthedocs.io/
"""
from collections import OrderedDict
from os import PathLike
from typing import (
    Union, Optional, ContextManager,
    Callable, )  # @UnusedImport
import contextlib
import io
import logging
import os
import re

from boltons.setutils import IndexedSet as iset

import os.path as osp

from . import interpctxt
from .._vendor import traitlets as trt
from .._vendor.traitlets import config as trc
from .._vendor.traitlets.traitlets import (
    List as ListTrait, Union as UnionTrait
)  # @UnresolvedImport
from .._vendor.traitlets.traitlets import Bool, Unicode, Instance
from ..utils import fileutil as fu


log = logging.getLogger(__name__)


def class2cmd_name(cls):
    name = cls.__name__
    if name.lower().endswith('cmd') and len(name) > 3:
        name = name[:-3]

    return (
        # Turns 'FOOBarCmd' --> 'FOO_Bar_Cmd'
        re.sub('(?<=[a-z0-9])([A-Z]+)', r'_\1', name).  # ('(?!^)([A-Z]+)')
        lower().
        # 'foo_bar_cmd' --> 'foo-bar-cmd'
        replace('_', '-'))


def first_line(doc):
    for l in doc.split('\n'):
        if l.strip():
            return l.strip()


def _set_also_read_only_trait_values(self, **trait_values):
    """Allow to set even `read_only` values."""
    for k, v in trait_values.items():
        self.set_trait(k, v)


trt.HasTraits.set_trait_values = _set_also_read_only_trait_values  # type: ignore


_no_app_help_message = "<Help for '%s' is missing>"


[docs]def class_help_description_lines(app_class): """ "Note: Reverse doc/description order bc classes do not have dynamic default :meth:`_desc`, below. """ desc = getattr(app_class, '__doc__', None) if not desc: desc_trait = app_class.class_traits().get('description') if desc_trait: desc = desc_trait.default_value desc = (isinstance(desc, str) and desc or _no_app_help_message % app_class) return trc.wrap_paragraphs(desc + '\n')
def cmd_class_short_help(app_class): return class_help_description_lines(app_class)[0]
[docs]def build_sub_cmds(*subapp_classes): """Builds an ordered-dictionary of ``cmd-name --> (cmd-class, help-msg)``. """ return OrderedDict((class2cmd_name(sa), (sa, cmd_class_short_help(sa))) for sa in subapp_classes)
[docs]def cmd_line_chain(cmd): """Utility returning the cmd-line(str) that launched a :class:`Cmd`.""" return ' '.join(c.name for c in reversed(cmd.my_cmd_chain()))
[docs]def chain_cmds(app_classes, argv=None, **root_kwds): """ Instantiate a list of ``[cmd, subcmd, ...]``, linking children to parents. :param app_classes: A list of cmd-classes: ``[root, sub1, sub2, app]`` Note: you have to "know" the correct nesting-order of the commands ;-) :param argv: cmdline args are passed to all cmds; make sure they do not contain any sub-cmds, or chain will be broken. Like :meth:`initialize()`, if undefined, replaced with ``sys.argv[1:]``. :return: The root(1st) cmd to invoke :meth:`Aplication.start()` Apply the :func:`pump_cmd()` or `collect_cmd()` on the return instance. - Normally `argv` contain any sub-commands, and it is enough to invoke ``initialize(argv)`` on the root cmd. This function shortcuts arg-parsing for subcmds with explict cmd-chaining in code. - This functions is the 1st half of :meth:`Cmd.launch_instance()`. """ if not app_classes: raise ValueError("No cmds to chained passed in!") app_classes = list(app_classes) root = app = None for app_cl in app_classes: if not isinstance(app_cl, type(trc.Application)): raise ValueError("Expected an Application-class instance, got %r!" % app_cl) if not root: ## The 1st cmd is always orphan, and gets returned. root = app = app_cl(**root_kwds) else: app.subapp = app = app_cl(parent=app) app.initialize(argv) app_classes[0]._instance = app return root
[docs]class CfgFilesRegistry(contextlib.ContextDecorator): """ Locate and account extensioned files (by default ``.json|.py``). - Collects a Locate and (``.json|.py``) files present in the `path_list`, or - Invoke this for every "manually" visited config-file, successful or not. - Files collected earlier should override next ones. """ def __init__(self, supported_cfg_extensions='.json .py'.split()): """ :param list supported_cfg_extensions: file extension (with dot) in the order to search. """ self.supported_cfg_extensions = tuple(supported_cfg_extensions) self._visited_tuples = [] #: A list of 2-tuples ``(folder, fname(s))`` with loaded config-files #: in ascending order (last overrides earlier). _visited_tuples = None @property def config_tuples(self): """ The consolidated list of loaded 2-tuples ``(folder, fname(s))``. Sorted in descending order (1st overrides later). """ return self._consolidate(self._visited_tuples) @staticmethod def _consolidate(visited_tuples): """ Reverse and remove multiple, empty records. Example:: >>> CfgFilesRegistry._consolidate([ ... ('a/b/', None), ... ('a/b/', 'F1'), ... ('a/b/', 'F2'), ... ('a/b/', None), ... ('c/c/', None), ... ('c/c/', None), ... ('d/', 'F1'), ... ('d/', None), ... ('c/c/', 'FF')]) [('a/b/', ['F1', 'F2']), ('c/c/', []), ('d/', ['F1']), ('c/c/', ['FF'])] """ consolidated = [] prev = None for b, f in visited_tuples: if not prev: # loop start prev = (b, []) elif prev[0] != b: # new dir consolidated.append(prev) prev = (b, []) if f: prev[1].append(f) if prev: consolidated.append(prev) return consolidated
[docs] def visit_file(self, fpath, loaded): """ Invoke this in ascending order for every visited config-file. :param bool loaded: Loaded successful? """ base, fname = osp.split(fpath) if loaded: self.collected_paths.add(fpath) pair = (base, fname) else: pair = (base, None) self._visited_tuples.append(pair)
[docs] def collect_fpaths(self, path_list): """ Collects all (``.json|.py``) files present in the `path_list`, (descending order). :param path_list: A list of paths (absolute, relative, dir or folders). :type path_list: List[Text] :return: fully-normalized paths, with ext """ collected_paths = self.collected_paths = iset() cfg_exts = self.supported_cfg_extensions def try_file_extensions(basepath): loaded_any = False for ext in cfg_exts: f = fu.ensure_file_ext(basepath, ext) if f in collected_paths: continue loaded = osp.isfile(f) self.visit_file(f, loaded=loaded) loaded_any |= loaded ## Load any files in `conf.d/`, alphabetically-sorted. # for ext in ('', ) + cfg_exts: if basepath.endswith(ext): conf_d = fu.ensure_file_ext(basepath.rstrip(ext), '.d') if os.path.isdir(conf_d): for f in sorted(os.listdir(conf_d)): loaded = f.endswith(cfg_exts) self.visit_file(osp.join(conf_d, f), loaded=loaded) loaded_any |= loaded return loaded_any def _derive_config_fpaths(path): # -> List[Text]: TODO: enable cmdlet typing comments """Return multiple *existent* fpaths for each config-file path (folder/file).""" p = fu.convpath(path) loaded_any = try_file_extensions(p) ## Do not strip ext if has matched WITH ext. if not loaded_any: try_file_extensions(osp.splitext(p)[0]) for cf in path_list: _derive_config_fpaths(cf) return list(collected_paths)
[docs] def head_folder(self): """The *last* existing visited folder (if any), even if not containing files.""" for dirpath, _ in self.config_tuples: if osp.exists(dirpath): assert osp.isdir(dirpath), ("Expected to be a folder:", dirpath) return dirpath
[docs]class PathList(ListTrait): """Trait that splits unicode strings on `os.pathsep` to form a the list of paths.""" def __init__(self, *args, **kwargs): super().__init__(*args, trait=UnionTrait((Unicode(), Instance(PathLike))), **kwargs)
[docs] def validate(self, obj, value): """break all elements also into `os.pathsep` segments""" value = super().validate(obj, value) value = [os.fspath(cf2) for cf1 in value for cf2 in os.fspath(cf1).split(os.pathsep)] return value
def from_string(self, s): if s: s = s.split(osp.pathsep) return s
[docs]class CmdException(Exception): pass
[docs]class Replaceable: """ A mixin to make :class:`HasTraits` instances clone like namedtupple's ``replace()``. :param changes: a dict of values keyed be their trait-name. Works nicely with *read-only* traits. """ @classmethod def new(cls, **trait_values): clone = cls() clone.set_trait_values(**trait_values) return clone def replace(self, **changes): from copy import copy clone = copy(self) clone.set_trait_values(**changes) return clone def _load_config(self, cfg, section_names=None, traits=None): """load traits from a Config object""" ## Overriden just to allow readonly traits to read from configs. if traits is None: traits = self.traits(config=True) if section_names is None: section_names = self.section_names() my_config = self._find_my_config(cfg) # hold trait notifications until after all config has been loaded with self.hold_trait_notifications(): from copy import deepcopy from .._vendor.traitlets.config.loader import _is_section_key for name, config_value in my_config.items(): if name in traits: if isinstance(config_value, trc.LazyConfigValue): # ConfigValue is a wrapper for using append / update on containers # without having to copy the initial value initial = getattr(self, name) config_value = config_value.get_value(initial) # We have to do a deepcopy here if we don't deepcopy the entire # config object. If we don't, a mutable config_value will be # shared by all instances, effectively making it a class attribute. self.set_trait(name, deepcopy(config_value)) elif not _is_section_key(name) and not isinstance(config_value, trc.Config): from difflib import get_close_matches if isinstance(self, trc.LoggingConfigurable): warn = self.log.warning else: import warnings warn = lambda msg: warnings.warn(msg, stacklevel=9) matches = get_close_matches(name, traits) msg = u"Config option `{option}` not recognized by `{klass}`.".format( option=name, klass=self.__class__.__name__) if len(matches) == 1: msg += u" Did you mean `{matches}`?".format(matches=matches[0]) elif len(matches) >= 1: msg += " Did you mean one of: `{matches}`?".format( matches=', '.join(sorted(matches))) warn(msg)
[docs]class Printable(metaclass=trt.MetaHasTraits): """ A :class:`HasTraits` mixin making ``str()`` return ``class(trait=value, ...)`` from traits. Which traits to print are decided is decided by :class:`TraitSelector`, for :attr:`printable_traits` class-property and ``printable=True`` tag. """ def __str__(self): from . import traitquery tnames_to_print = traitquery.select_traits(self, Printable) cls_name = getattr(self, 'name', type(self).__name__) trait_values_msg = ', '.join('%s=%s' % (tname, getattr(self, tname)) for tname in tnames_to_print) return '%s(%s)' % (cls_name, trait_values_msg)
[docs]class Forceable(metaclass=trt.MetaHasTraits): """Mixin to facilitate "forcing" actions by ignoring/delaying their errors. """ force = ListTrait( UnionTrait((Bool(), Unicode())), config=True, help="Force things to perform their duties without complaints.")
[docs] def is_forced(self, token: Union[str, bool] = True): """ Whether some action ided by `token` is allowed to go thorugh in case of errors. :param token: an optional string/bool to search for in :attr:`force` according to the following table:: token: |NONE | |FALSE|TRUE|"str"| force-element: |-----|----|-----| [], FALSE| X | X | X | TRUE| X | O | X | '*'| X | X | O | "str"| X | X | = | - Rows above, win; columns to the left win. - To catch all tokens, use ``--force=true, --force='*'`` .. Tip:: prefer using it via :class:`ErrLog` contextman. """ assert token is None or isinstance(token, (bool, str)), token force = set(self.force) if not token or not force or False in force: return False if token in force: return True return isinstance(token, str) and '*' in force
[docs] def errlogged(self, *exceptions, token: Union[bool, str] = None, doing=None, raise_immediately=None, warn_log: Callable = None, info_log: Callable = None ): """ A context-man for nesting :class:`ErrLog` instances. - See :class:`ErrLog` for other params. - The pre-existing `errlog` is searched in :data:`_current_errlog` attribute. - The `_current_errlog` on entering context, is restored on exit; original is `None`. - The returned errlog always has its :attr:`ErrLog.parent` set to this enforceable. - Example of using this method for multiple actions in a loop:: with self.errlogged(OSError, doing="loading X-files", token='fread'): for fpath in file_paths: with self.errlogged(doing="reading '%s'" % fpath): fbytes.append(fpath.read_bytes()) # Any errors collected above, will be raised/WARNed here. """ ## TODO: decouple `force` from `ErrLog`. from . import errlog return errlog.nesterrlog( self, *exceptions, token=token, doing=doing, raise_immediately=raise_immediately, warn_log=warn_log, info_log=info_log, )
[docs]class CmdletsInterpolation(interpctxt.InterpolationContext): """ Adds `cmdlets_map` into interp-manager for for help & cmd mechanics. Client-code may add more dicts in `interpolation_context.maps` list. """ def __init__(self, *args, **kw): super().__init__(*args, **kw) self.cmdlets_map = { 'appname': '<APP>', 'cmd_chain': '<CMD>', } self.maps.append(self.cmdlets_map)
#: That's the singleton interp-manager used by all cmdlet configurables. cmdlets_interpolations = CmdletsInterpolation() def _travel_parents(self) -> trc.Configurable: """ Utility to travel up parent-chain. :return: the top parent (or self if no parents) """ while self.parent: self = self.parent return self trc.Configurable.root_object = _travel_parents # type: ignore def _travel_parents_untill_active_cmd(self, scream=False) -> trc.Application: """ Utility to travel up parent-chain until the active subcmd is met. :return: the active subcmd, or None if not `scream` :raise AssertionError: if `scream` and no active subcmd found """ def test_app(app): return getattr(app, 'subapp', False) is None while self.parent: if test_app(self): return self self = self.parent if test_app(self): return self if scream: raise AssertionError('ROOTED!') #: NOT USED!! trc.Configurable.active_subcmd = _travel_parents_untill_active_cmd # type: ignore
[docs]class Spec(Forceable, trc.LoggingConfigurable): """Common properties for all configurables."""
[docs] @classmethod def class_get_trait_help(cls, trait, inst=None, helptext=None): text = super().class_get_trait_help(trait, inst=inst, helptext=helptext) obj = inst if inst else cls return obj.interpolations.interp(text, _stub_keys=True, _suppress_errors=True)
def _log_default(self): "Mimic log-hierarchies for Configurable; their loggers are not hierarchical. " cls = type(self) return logging.getLogger('.'.join((cls.__module__, cls.__name__))) verbose = Bool( allow_none=True, config=True, help="Repeated use increase logging-level from WARNING-->INFO-->DEBUG.") debug = Bool( allow_none=True, config=True, help="Stricter actions, to discover possible problems.") dry_run = Bool( allow_none=True, config=True, help="Do not write files - just pretend.") # TODO: refact to hide `Spec.interpolations` attribute. interpolations = cmdlets_interpolations
[docs] def ikeys(self, *maps, **kwds) -> ContextManager[CmdletsInterpolation]: """ Temporarily place self before the given maps and kwds in interpolation-cntxt. - Self has the least priority, kwds the most. - For params, see :meth:`interp.InterpolationContext.interp()`. .. Attention:: Must use ``str.format_map()`` when `_stub_keys` is true; otherwise, ``format()`` will clone all existing keys in a static map. """ return self.interpolations.ikeys(self, *maps, **kwds)
[docs] def interp(self, text: Optional[str], *maps, **kwds) -> Optional[str]: """ Interpolate text with self attributes before maps and kwds given. :param text: the text to interplate; None/empty returned as is - For params, see :meth:`interp.InterpolationContext.interp()`. - Self has the least priority, kwds the most. """ if not text: return text with self.ikeys(*maps, **kwds) as cntx: new_text = text.format_map(cntx) return new_text
[docs]class Cmd(trc.Application, Spec): "Common machinery for all (sub)commands."
[docs] @classmethod def make_cmd(app, argv=None, **kwargs): """ Instanciate, initialize and return application. :param argv: Like :meth:`initialize()`, if undefined, replaced with ``sys.argv[1:]``. - Tip: Apply :func:`pump_cmd()` on return values to process generators of :meth:`run()`. - This functions is the 1st half of :meth:`launch_instance()` which invokes and discards :meth:`start()` results. """ ## Overriden just to return `start()` AND fix ipython/traitlets#474 # when cmds inherit same class. app.clear_instance() cmd = app.instance(**kwargs) cmd.initialize(argv) return cmd
@trt.default('name') def _name(self): """Without it, need to set `name` attr on every class.""" name = class2cmd_name(type(self)) return name @trt.default('description') def _desc(self): """Without it, need to set `description` attr on every class.""" cls = type(self) return cls.__doc__ or '' def _log_default(self): "Mimic log-hierarchies for Configurable; their loggers are not hierarchical. " cls = type(self) return logging.getLogger('.'.join((cls.__module__, cls.__name__))) ########## ## HELP ## ##########
[docs] def emit_description(self): ## Overridden for interpolating app-name. txt = self.description or self.__doc__ or _no_app_help_message % type(self) txt = self.interp(txt, _stub_keys=True, _suppress_errors=True) for p in trc.wrap_paragraphs('%s: %s' % (cmd_line_chain(self), txt)): yield p yield ''
option_description = Unicode(""" Options are convenience aliases to configurable class-params, as listed in the "Equivalent to" description-line of the aliases. To see all configurable class-params for some <cmd>, use:: <cmd> --help-all or view help for specific parameter using:: {appname} desc <class>.<param> """.strip())
[docs] def emit_options_help(self): """Yield the lines for the options part of the help.""" if not self.flags and not self.aliases: return header = 'Options' yield header yield '=' * len(header) opt_desc = self.interp(self.option_description, _stub_keys=True, _suppress_errors=True) for p in trc.wrap_paragraphs(opt_desc): yield p yield '' for l in self.emit_flag_help(): yield l for l in self.emit_alias_help(): yield l yield ''
subcommand_description = Unicode(""" Subcommands are launched as:: {cmd_chain} <subcmd> [args] """)
[docs] def emit_subcommands_help(self): """Yield the lines for the subcommand part of the help.""" if not self.subcommands: return header = "Subcommands" yield header yield '=' * len(header) for p in trc.wrap_paragraphs(self.interp(self.subcommand_description, _stub_keys=True, _suppress_errors=True)): yield p yield '' for subc, (cls, hlp) in self.subcommands.items(): yield subc if hlp: yield trc.indent(trc.dedent(hlp.strip())) yield ''
[docs] def emit_examples(self): ## Overridden for interpolating app-name. if self.examples: txt = self.interp(self.examples, _stub_keys=True, _suppress_errors=True).strip() yield "Examples" yield "--------" yield '' yield trc.indent(trc.dedent(txt)) yield ''
[docs] def emit_help_epilogue(self, classes=None): """Yield the very bottom lines of the help message. If classes=False (the default), print `--help-all` msg. """ if not classes: epilogue = trc.dedent(""" -------- - For available option, configuration-params & examples, use: {cmd_chain} help (OR --help-all) - For help on specific classes/params, use: {appname} config desc <class-or-param-1>... - To inspect configuration values: {appname} config show <class-or-param-1>... """) yield self.interp(epilogue, _stub_keys=True, _suppress_errors=True)
############ ## CONFIG ## ############ @trt.observe('parent') def _inherit_parent_cmd(self, change): if self.parent: parent = self.parent if parent.flags: flags = self.flags for k, v in parent.flags.items(): flags.setdefault(k, v) if parent.aliases: aliases = self.aliases for k, v in parent.aliases.items(): aliases.setdefault(k, v) ## Need also classes bc flags/aliases may depend on them # if parent.classes: classes = set(self.classes) classes.update(parent.classes) self.classes = list(classes) config_paths = PathList( help=""" Absolute/relative folder/file path(s) to read "static" config-parameters from. - Sources for this parameter can either be CLI or ENV-VAR; since the loading of config-files depend on this parameter, file-configs are ignored. - Multiple values may be given and each one may be separated by '%(sep)s'. Priority is descending, i.e. config-params from the 1st one overrides the rest. - For paths resolving to existing folders, the filenames `<basename>(.py|.json)` are appended and searched (in this order); otherwise, any file-extension is ignored, and the mentioned extensions are combined and searched. Tips: - Use `config infos` to view the actual paths/files loaded. - Use `config write` to produce a skeleton of the config-file. Examples: To read and apply in descending order: [~/my_conf, /tmp/conf.py, ~/.{appname}.json] you may issue: <cmd> --config-paths=~/my_conf%(sep)s/tmp/conf.py --Cmd.config_paths=~/.{appname}.jso """ % {'sep': osp.pathsep} ## TODO: Simplify path-loading when /ipython/traitlets#242 merged?? # NOTE: Patch default-value on `Cmd` so all subcmds load same configs. ).tag(config=True) _cfgfiles_registry: Optional[CfgFilesRegistry] = None @property def loaded_config_files(self): return self._cfgfiles_registry and self._cfgfiles_registry.config_tuples or [] config_basename = Unicode( help="""" The config-file's basename (no path or extension) to search when not explicitly specified. By default, it's the root app's name, prefixed with a dot('.'). """) @trt.default('config_basename') def _config_basename(self): return '.' + self.root_object().name def _collect_static_fpaths(self, config_paths=None): """ Return fully-normalized paths, with ext. """ config_paths = self.config_paths if config_paths is None else config_paths self._cfgfiles_registry = CfgFilesRegistry('.json .yaml .py'.split()) fpaths = self._cfgfiles_registry.collect_fpaths(config_paths) return fpaths def _read_supported_configs(self, cfpath): """ :param str cfpath: The absolute config-file path with either ``.py`` or ``.json`` ext. """ from .yamlconfloader import YAMLFileConfigLoader log = self.log loaders = { '.py': trc.PyFileConfigLoader, '.json': trc.JSONFileConfigLoader, '.yaml': YAMLFileConfigLoader, } ext = osp.splitext(cfpath)[1] loader = loaders.get(str.lower(ext)) assert loader, cfpath # Must exist. config = None try: config = loader(cfpath, path=None, log=log).load_config() except trc.ConfigFileNotFound: ## Config-file deleted between collecting its name and reading it. pass except Exception as ex: if self.raise_config_file_errors: raise log.error("Failed loading config-file '%s' due to: %s", cfpath, ex, exc_info=True) else: log.debug("Loaded config-file: %s", cfpath) return config
[docs] def read_config_files(self, config_paths=None): # -> trc.Config """ Load :attr:`config_paths` and maintain :attr:`config_registry`. :param config_paths: optional paths to override those in `config_paths` trait, in descending order (1st overrides the rest). :return: the static_config loaded - Configuration files are read and merged from ``.json`` and/or ``.py`` files in :attr:`config_paths`. - Adapted from :meth:`load_config_file()` & :meth:`_load_config_files()` but without applying configs on the app, just returning them. """ config_paths = self._collect_static_fpaths(config_paths) new_config = trc.Config() ## Registry to detect collisions. loaded = {} # type: Dict[Text, Config] for cfpath in config_paths[::-1]: config = self._read_supported_configs(cfpath) if config: for filename, earlier_config in loaded.items(): collisions = earlier_config.collisions(config) if collisions: import json self.log.warning( "Collisions detected in %s and %s config files." " %s has higher priority: %s", filename, cfpath, cfpath, json.dumps(collisions, indent=2) ) loaded[cfpath] = config new_config.merge(config) return new_config
def write_default_config(self, config_file=None, force=False): if config_file: config_file = fu.convpath(config_file) if osp.isdir(config_file): config_file = osp.join(config_file, self.config_basename) elif self.config_paths: config_file = self.config_paths[0] else: raise AssertionError("No config-file given to write to!") config_file = fu.ensure_file_ext(config_file, '.py') is_overwrite = osp.isfile(config_file) if is_overwrite: if not force: raise CmdException("Config-file '%s' already exists!" "\n Specify `--force` to overwrite." % config_file) else: import shutil from datetime import datetime now = datetime.now().strftime('%Y%m%d-%H%M%S%Z') backup_name = '%s-%s.py' % (osp.splitext(config_file)[0], now) shutil.move(config_file, backup_name) op_msg = ", old file renamed --> '%s'" % backup_name else: op_msg = "" self.log.info("Writting config-file '%s'%s...", config_file, op_msg) fu.ensure_dir_exists(os.path.dirname(config_file), 0o700) config_text = self.generate_config_file() with io.open(config_file, mode='wt') as fp: fp.write(config_text) all_app_configurables = ListTrait( help=""" A sequence of all app configurables to feed into `config` sub-command. Defined either on :class:`Cmd` superclass or on *root-cmd*. """ ) ############# ## SUBAPPS ## ############# ## Overriden for existing sub-apps to die and new ones to receive parents # even when hierarchy changes (e.g. in TCs). # See https://github.com/ipython/traitlets/commit/e857996#commitcomment-27681994
[docs] @trc.catch_config_error def initialize_subcommand(self, subc, argv=None): subapp, _ = self.subcommands.get(subc) if isinstance(subapp, trc.six.string_types): subapp = trc.import_item(subapp) ## Cannot issubclass() on a non-type (SOhttp://stackoverflow.com/questions/8692430) if isinstance(subapp, type) and issubclass(subapp, trc.Application): # Clear existing instances before... #type(self).clear_instance() subapp.clear_instance() # instantiating subapp... self.subapp = subapp.instance(parent=self) elif callable(subapp): # or ask factory to create it... self.subapp = subapp(self) else: raise AssertionError("Invalid mappings for subcommand '%s'!" % subc) # ... and finally initialize subapp. ## NOTE: do not call __wrapped__ bc relying on captured failures # to distinguish which args belong to next subcmds. self.subapp.initialize(argv)
[docs] @classmethod def clear_instance(cls): if not cls.initialized(): return cls._instance = None
[docs] @classmethod def instance(cls, *args, **kwargs): # Create and save the instance if cls._instance is None: inst = cls(*args, **kwargs) obj = cls._instance = inst elif isinstance(cls._instance, cls): obj = cls._instance else: raise trc.MultipleInstanceError( "An incompatible sibling of '%s' is already instanciated" " as singleton: %s" % (cls.__name__, type(cls._instance).__name__) ) return obj
############# ## STARTUP ## #############
[docs] def my_cmd_chain(self): """Return the chain of cmd-classes starting from my self or subapp.""" cmd_chain = [] pcl = self.subapp if self.subapp else self while pcl: cmd_chain.append(pcl) pcl = pcl.parent return cmd_chain
def _is_dispatching(self): """True if dispatching to another command.""" return isinstance(self.subapp, trc.Application) # subapp == trait | subcmd | None def update_interp_context(self, argv=None): cmdlets_map = self.interpolations.cmdlets_map cmdlets_map['cmd_chain'] = cmd_line_chain(self) cmdlets_map['appname'] = self.root_object().name
[docs] @trc.catch_config_error def initialize(self, argv=None): """ Invoked after __init__() by `make_cmd()` to apply configs and build subapps. :param argv: If undefined, they are replaced with ``sys.argv[1:]``! It parses cl-args before file-configs, to detect sub-commands and update any :attr:`config_paths`, then it reads all file-configs, and then re-apply cmd-line configs as overrides (trick copied from `jupyter-core`). """ self.update_interp_context() super().initialize.__wrapped__(self, argv) # not to re-catch_config_error if self._is_dispatching(): ## Only the final child reads file-configs. # Also avoid contaminations with user if generating-config. return config = self.read_config_files() config.merge(self.cli_config) ## Ensure cmd-chain configured, or else # root-app would have been configed only from cmd-line args. # while self: self.update_config(config) self = self.parent
[docs] def start(self): """Dispatches into sub-cmds (if any), and then delegates to :meth:`run()`. If overriden, better invoke :func:`super()`, but even better to override :meth:`run()`. """ if self.subapp is None: res = self.run(*self.extra_args) return res return self.subapp.start()
[docs] def run(self, *args): """Leaf sub-commands must inherit this instead of :meth:`start()` without invoking :func:`super()`. :param args: Invoked by :meth:`start()` with :attr:`extra_args`. By default, screams about using sub-cmds, or about doing nothing! """ import ipython_genutils.text as tw assert self.subcommands, "Override run() method in cmd subclasses." if args: subcmd_msg = "unknown sub-command `%s`!" % args[0] else: subcmd_msg = "sub-command is missing!" subcmds = '\n'.join(' %10s: %s' % (k, desc) for k, (_, desc) in self.subcommands.items()) msg = tw.dedent( """ %(cmd_chain)s: %(subcmd_msg)s Try one of: %(subcmds)s %(epilogue)s""") % { 'subcmd_msg': subcmd_msg, 'cmd_chain': cmd_line_chain(self), 'subcmds': subcmds, 'epilogue': '\n'.join(self.emit_help_epilogue()), } raise CmdException(msg)
################## ## YAML CONFIGS ## ## patch traits ## ################## def _dumpable_trait_value(cls, trait, config): owner_class = trait.this_class if owner_class is cls: owner_classname = owner_class.__name__ trait_name = trait.name if '%s.%s' % (owner_classname, trait_name) in config: cfg_value = config[owner_classname][trait_name] default_value = trait.default() if cfg_value != default_value: return cfg_value, default_value def _make_comment(s): """return a commented, wrapped block.""" from ipython_genutils.text import wrap_paragraphs return '\n \n '.join(wrap_paragraphs(s, 78)) + '\n '
[docs]def class_config_yaml(cls, outer_cfg, # noqa: C901 # too complex: TODO: FIX Yaml+contextvars classes=None, config: trc.Config = None): """Get the config section for this Configurable. :param list classes: (optional) The list of other classes in the config file, used to reduce redundant help descriptions. If given, only params from these classes reported. :param config: If given, only what is contained there is included in generated yaml, with help-descriptions from classes, only where class default-values differ from the values contained in this dictionary. """ from ..utils import yamlutil as yu from ruamel.yaml.comments import CommentedMap import textwrap as tw cfg = CommentedMap() for name, trait in sorted(cls.class_traits(config=True).items()): if config is None: default_value = trait.default() cfg[name] = yu.preserve_yaml_literals(default_value) else: dumpables = _dumpable_trait_value(cls, trait, config) if dumpables: value, default_value = dumpables cfg[name] = yu.preserve_yaml_literals(value) else: continue trait_lines = [] default_repr = yu.ydumps(default_value) if default_repr and default_repr.count('\n') > 1 and default_repr[0] != '\n': default_repr = tw.indent('\n' + default_repr, ' ' * 9) if yu._dump_trait_help.get(): if classes: defining_class = cls._defining_class(trait, classes) else: defining_class = cls if defining_class is cls: # cls owns the trait, show full help if trait.help: trait_lines.append('') trait_lines.append(_make_comment(trait.help).strip()) if 'Enum' in type(trait).__name__: # include Enum choices trait_lines.append('Choices: %s' % trait.info()) trait_lines.append('Default: %s' % default_repr) else: # Trait appears multiple times and isn't defined here. # Truncate help to first line + "See also Original.trait" if trait.help: trait_lines.append(_make_comment(trait.help.split('\n', 1)[0])) trait_lines.append('See also: %s.%s' % (defining_class.__name__, name)) cfg.yaml_set_comment_before_after_key(name, before='\n'.join(trait_lines), indent=2) if not cfg: return outer_cfg[cls.__name__] = cfg if yu._dump_trait_help.get(): # section header breaker = '#' * 76 parent_classes = ', '.join( p.__name__ for p in cls.__bases__ if issubclass(p, trc.Configurable) ) s = "%s(%s) configuration" % (cls.__name__, parent_classes) head_lines = ['', '', breaker, s, breaker] # get the description trait desc = class_help_description_lines(cls) if desc: head_lines.append(_make_comment('\n'.join(desc))) outer_cfg.yaml_set_comment_before_after_key( cls.__name__, '\n'.join(head_lines))
trc.Configurable.class_config_yaml = classmethod(class_config_yaml) # type: ignore def order_class_hierarchy(classes, mro_func=lambda cls: cls.mro()): visited_index = {} # invered-hierarchy index: {sub: super} ordered = [] def visit_class(cls): nonlocal ordered assert cls not in visited_index, (cls, visited_index, classes) mro = mro_func(cls) if set(mro) & visited_index.keys(): ## Visiting a super of existing classes. # ## Replace all it's subclasses in`ordered` with it. # ordered = [c for c in ordered if c not in mro] ordered.extend(mro) ## Update all subclasses in hierarchy-index. # for sub in mro: visited_index[sub] = cls for cls in classes: if cls not in visited_index: visit_class(cls) return ordered def generate_class_hierarchy_text(classes): def class_line(cls): bases = [base for base in cls.__bases__ if base in classes] max_class_name_len = max(len(cls.__name__) for cls in classes) if bases: fmt = ' %%-%is --> %%s' % max_class_name_len return fmt % (cls.__name__, ', '.join(b.__name__ for b in bases)) else: fmt = ' %%-%is' % max_class_name_len return fmt % cls.__name__ return '\n'.join(class_line(cls) for cls in classes)
[docs]def generate_config_file_yaml(self, classes=None, config: trc.Config = None): """ generate default config file from Configurables :param config: If given, only what is contained there is included in generated yaml, with help-descriptions from classes, only where class default-values differ from the values contained in this dictionary. """ from ruamel.yaml.comments import CommentedMap import ipython_genutils.text as tw from ..utils import yamlutil as yu classes = self.classes if classes is None else classes config_classes = set(self._classes_with_config_traits(classes)) ## Order: subclass at the top. # Filtering needed because `order_class_hierarchy()` brings full mro(). ordered_classes = [cls for cls in order_class_hierarchy(classes) if cls in config_classes] class_hiearchy = generate_class_hierarchy_text(ordered_classes) cfg = CommentedMap() if yu._dump_trait_help.get(): start_comment = tw.dedent(""" ############################################################################ Configuration hierarchy for `%s`: %s ############################################################################ """) % (self.root_object().name, class_hiearchy) cfg.yaml_set_start_comment(start_comment) for cls in ordered_classes[::-1]: # Inverted order from title's class-list, above. cls.class_config_yaml(cfg, config=config) return cfg
trc.Application.generate_config_file_yaml = generate_config_file_yaml # type: ignore