# -*- coding: utf-8 -*-
# Copyright 2015-2018 European Commission (JRC);
# Licensed under the EUPL 1.2+ (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at:
"""The code of *polyvers* shell-commands."""

from collections import OrderedDict, defaultdict, Mapping
from pathlib import Path
from typing import Dict
from typing import Tuple, Set, List, Optional  # noqa: F401 @UnusedImport, flake8 blind in funcs
import logging

from boltons.setutils import IndexedSet as iset

import os.path as osp
import polyversion as pvlib
import textwrap as tw

from . import APPNAME, __version__, __updated__, pvtags, pvproject
from ._vendor import traitlets as trt
from ._vendor.traitlets import config as trc
from ._vendor.traitlets.traitlets import (
    List as ListTrait, Tuple as TupleTrait, Dict as DictTrait)
from ._vendor.traitlets.traitlets import Bool, Unicode
from .cmdlet import cmdlets, autotrait
from .utils import fileutil as fu, yamlutil as yu

log = logging.getLogger(__name__)

## Config sources ##

[docs]def merge_dict(dct, merge_dct): """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of updating only top-level keys, dict_merge recurses down into dicts nested to an arbitrary depth, updating keys. The ``merge_dct`` is merged into ``dct``. :param dct: dict onto which the merge is executed :param merge_dct: dct merged into dct :return: None Adapted from: """ for k in merge_dct.keys(): if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], Mapping)): merge_dict(dct[k], merge_dct[k]) else: dct[k] = merge_dct[k]
[docs]class PolyversCmd(cmdlets.Cmd, yu.YAMLable): """ Bump independently PEP-440 versions of sub-project in Git monorepos. SYNTAX: {cmd_chain} <sub-cmd> ... """ version = __version__ # type: ignore # mypy complains with engraved value??g. examples = Unicode(""" - Let it guess the configurations for your monorepo:: {cmd_chain} init You may specify different configurations paths with:: {cmd_chain} --config-paths /foo/bar/:~/{config_basename}.yaml:. - Use then the main sub-commands:: {cmd_chain} init # (optional) use it once, or to update configs. {cmd_chain} status {cmd_chain} bump 0.0.1.dev0 -c '1st commit, untagged' {cmd_chain} bump -t 'Mostly model changes, tagged' """) classes = [pvproject.Project] projects = ListTrait( autotrait.AutoInstance(pvproject.Project), config=True) @trt.default('subcommands') def _subcommands(self): subcmds = OrderedDict() subcmds['bump'] = ('polyvers.bumpcmd.BumpCmd', "Increase or set (sub-)project version(s).") subcmds.update(cmdlets.build_sub_cmds(InitCmd, StatusCmd, LogconfCmd)) subcmds['config'] = ( 'polyvers.cmdlet.cfgcmd.ConfigCmd', "Commands to inspect configurations and other cli infos.") return subcmds
[docs] def collect_app_infos(self): """Provide extra infos to `config infos` subcommand.""" return { 'version': __version__, 'updated': __updated__, ## TODO: add more app-infos. }
@trt.default('all_app_configurables') def _all_app_configurables(self): from . import bumpcmd return [type(self), pvproject.Project, InitCmd, StatusCmd, bumpcmd.BumpCmd, LogconfCmd, pvproject.Engrave, pvproject.Graft, ] @trt.default('config_paths') def _config_paths(self): basename = self.config_basename paths = [] git_root = fu.find_git_root() if git_root: paths.append(str(git_root / basename)) else: paths.append('.') paths.append('~/%s' % basename) return paths curdir = Unicode( config=True, help="Work as if changed in the given current-dir.")
[docs] @trc.catch_config_error def parse_command_line(self, argv=None): super().parse_command_line.__wrapped__(self, argv) # not to re-catch_config_error ## Chdir as soon as possible. # if self.curdir:'Switching to dir: %s', self.curdir) import os try: os.chdir(self.curdir) except Exception as ex: raise cmdlets.CmdException( "Cannot switch to dir '%s' due to: %s" % (self.curdir, ex)) from ex
_git_root: Optional[Path] = None @property def git_root(self) -> Path: if self._git_root is None: self._git_root = fu.find_git_root() if not self._git_root: raise pvtags.NoGitRepoError( "Current-dir '%s' is not inside a git-repo!" % Path().resolve()) return self._git_root pdata = DictTrait( key_trait=Unicode(), value_trait=Unicode(), config=True, help=""" Pairs of (pname: basepath) for the projects in the repo. - This param exists only when specifying those data from cmdline; otherwise, in configuration files prefer to specify directly `PolyversCmd.projects`. - example:: --pdata foo=foo/fpath """) autodiscover_subproject_projects = ListTrait( autotrait.AutoInstance(pvproject.Project), default_value=[{ 'engraves': [{ 'name': '', 'globs': ['**/'], 'grafts': [{ 'name': 'project-name', 'regex': tw.dedent(r''' (?xm) \b(name|PROJECT|APPNAME|APPLICATION) \ *=\ * (['"]) (?P<pname>[\w\-.]+) \2 '''), 'slices': -1 }] }] }], allow_none=True, config=True, help=""" Projects with globs/regexes that can autodiscover sub-project basepaths/names. - Needed when no configuration file is given (or has partial infos). - The glob-patterns contained in this `Project[Engrave[Graft]]` should match files in the root dir of auto-discovered-projects (`Graft.subst` is not used here). - `Project.basepath` must denote the project-root relative to the globbed file's dir ('.' assumed). - Project name(s) must be captured by the `Graft`. If none (or different ones) match, project detection fails. - A Project is identified only if file(s) are globbed AND regexp(s) matched. - User should fallback to --data if it fails. """) autodiscover_version_scheme_projects = TupleTrait( autotrait.AutoInstance(pvproject.Project), autotrait.AutoInstance(pvproject.Project), default_value=({ 'pname': '<PVTAG>', 'tag_vprefixes': pvlib.tag_vprefixes, 'pvtag_format': '*-v*', 'pvtag_regex': tw.dedent(r""" (?xmi) ^(?P<pname>[A-Z0-9]|[A-Z0-9][A-Z0-9._-]*?[A-Z0-9]) - v(?P<version>\d[^-]*) (?:-(?P<descid>\d+-g[a-f\d]+))?$ """)}, { 'pname': '<VTAG>', 'tag_vprefixes': pvlib.tag_vprefixes, 'pvtag_format': pvlib.vtag_format, 'pvtag_regex': pvlib.vtag_regex, }), config=True, help=""" A pair of Projects with patterns/regexps matching *pvtags* or *vtags*, respectively. - Needed when a) no configuration file is given (or has partial infos), and b) when constructing/updating the configuration file. - Screams if discovered same project-name with conflicting basepaths. """) def _autodiscover_project_basepaths(self) -> Dict[str, Path]: """ Invoked when no config exists (or asked to updated it) to guess projects. :return: a mapping of {pnames: basepaths} """ from . import engrave if not self.autodiscover_subproject_projects: raise cmdlets.CmdException( "No `Polyvers.autodiscover_subproject_projects` param given!") fproc = engrave.FileProcessor(parent=self) with self.errlogged(doing='discovering project paths', scan_projects = self.autodiscover_subproject_projects #: Dict[Path, #: List[Tuple[pvproject.Project, Engrave, Graft, List[Match]]]] match_map = fproc.scan_projects(scan_projects) ## Accept projects only if one, and only one, # pair (pname <--> path) matched. # pname_path_pairs: List[Tuple[str, Path]] = [ (match.groupdict()['pname'].decode('utf-8'), fpath.parent / (prj.basepath or '.')) for fpath, mqruples in match_map.items() for prj, _eng, _graft, match in mqruples] unique_pname_paths = iset(pname_path_pairs) ## check basepath conflicts. # projects: Dict[str, Path] = {} dupe_projects: Dict[str, Set[Path]] = defaultdict(set) for pname, basepath in unique_pname_paths: dupe_basepath = projects.get(pname) if dupe_basepath and dupe_basepath != basepath: dupe_projects[pname].add(basepath) else: projects[pname] = basepath if dupe_projects: raise cmdlets.CmdException( "Discovered conflicting project-basepaths: %s" % yu.ydumps(dupe_basepath)) return projects def _autodiscover_versioning_scheme(self): """ Guess whether *monorepo* or *mono-project* versioning schema applies. :return: one of :func:`pvtags.make_vtag_project`, :func:`pvtags.make_pvtag_project` """ pvtag_proj, vtag_proj = self.autodiscover_version_scheme_projects pvtags.populate_pvtags_history(pvtag_proj, vtag_proj) if bool(pvtag_proj.pvtags_history) ^ bool(vtag_proj.pvtags_history): return (pvtags.make_pvtag_project(parent=self) if pvtag_proj.pvtags_history else pvtags.make_vtag_project(parent=self)) else: raise cmdlets.CmdException( "Cannot auto-discover versioning scheme, " "missing or contradictive versioning-tags:\n%s" "\n\n Try --monorepo/--mono-project flags." % yu.ydumps({'pvtags': pvtag_proj.pvtags_history, 'vtags': vtag_proj.pvtags_history}))
[docs] def bootstrapp_projects(self) -> None: """ Ensure valid configuration exist for monorepo/mono-project(s). :raise CmdException: if cwd not inside a git repo """ git_root = self.git_root template_project = pvproject.Project(parent=self) has_template_project = template_project.is_good() if not has_template_project: template_project = self._autodiscover_versioning_scheme()"Auto-discovered versioning scheme: %s", template_project.pname) ## Store template-project for InitCmd. self._template_project = template_project has_subprojects = bool(self.projects) if not has_subprojects: pdata: Dict[str, Path] = {pname: Path(basepath) for pname, basepath in self.pdata.items()} if not pdata: pdata = self._autodiscover_project_basepaths() if not pdata: raise cmdlets.CmdException( "Cannot auto-discover (sub-)project path(s)!" "\n Please use `--pdata <pname>=<basepath> ...` " "to specify sub-project(s) explicitly.") "Auto-discovered %i sub-project(s) in git-root '%s': \n%s", len(pdata), git_root.resolve(), yu.ydumps({k: str(v) for k, v in pdata.items()})) self.projects = [template_project.replace(pname=pname, basepath=basepath, _pvtags_collected=None) for pname, basepath in pdata.items()] ## Set discovered projects in `config` for InitCmd. self.config.PolyversCmd.projects = [ {'pname': pname, 'basepath': basepath} for pname, basepath in pdata.items()] if len(self.projects) > 1 and template_project.pname == pvtags.MONO_PROJECT: self.log.warning( "Incompatible *vtags* version-scheme with %s sub-projects!" "\n You may ether switch to *pvtags* (see `--monorepo`) or " "\n override projects to a single one (see `--pdata`). " "\n\n You can then make the choice permanent using `init` cmd.", len(self.projects)) return self.projects
class _SubCmd(PolyversCmd): def __init__(self, *args, **kw): self.subcommands = {} super().__init__(*args, **kw) _init_update_help = \ "Update existing configs, excluding user's home folder and those overriden by cmd-line options" _init_doc_help = \ "Include class/param descriptions/defaults in the dumped yaml."
[docs]class InitCmd(_SubCmd): """Generate configurations based on directory contents.""" update = Bool( config=True, help=_init_update_help) doc = Bool( config=True, help=_init_doc_help) flags = { # type: ignore ('u', 'update'): ({'InitCmd': {'update': True}}, _init_update_help), 'doc': ({'InitCmd': {'doc': True}}, _init_doc_help), } def _read_non_user_configs(self) -> trc.Config: "Avoid writting user-specific configs as project ones." config_paths = [p for p in self.config_paths if not p.startswith('~')] return self.read_config_files(config_paths)
[docs] @trc.catch_config_error def initialize(self, argv=None): ## Overridden, to skip configs-loading unless --update given. self.update_interp_context() self.parse_command_line.__wrapped__(self, argv) # not to re-catch_config_error if self.update: config = self._read_non_user_configs() config.merge(self.cli_config) if self.show_config or self.show_config_json: self._dump_config() while self: self.update_config(config) self = self.parent
def _cleaned_config(self): config = self.config clean_keys = [ 'InitCmd.update', 'InitCmd.doc', 'Spec.*', '*.verbose', '*.debug', '*.dry_run', '*.force', ] for path in clean_keys: sec, tname = path.split('.') if sec == '*' or sec in config: if tname == '*': del config[sec] else: sec = config[sec] if tname in sec: del sec[tname] return config def _make_yaml_config(self) -> trc.Config: old_config = self._cleaned_config() tproj = self._template_project assert tproj and tproj.is_good(), "Bootstrap template: %s" % tproj ## Create explicetely `Project` into config, # bc program has does not hold such toplevel class. self.config.Project = trc.Config({ 'tag_vprefixes': tproj.tag_vprefixes, 'pvtag_format': tproj.pvtag_format, 'pvtag_regex': tproj.pvtag_regex, }) _t = yu._dump_trait_help.set(self.doc) try: return self.generate_config_file_yaml( # type: ignore # meth monkeypatched self.all_app_configurables, old_config) finally: yu._dump_trait_help.reset(_t)
[docs] def run(self, *args): if len(args) > 0: raise cmdlets.CmdException( "Cmd %r takes no arguments, received %d: %r!" % (, len(args), args)) import io self.bootstrapp_projects() new_config = self._make_yaml_config() cfgpath = Path(self.git_root) / ('%s.yaml' % self.config_basename) if self.log.isEnabledFor(logging.DEBUG): from pprint import pformat self.log.debug("Writing config to yaml file '%s': \n%s", cfgpath, pformat(new_config)) if self.dry_run: sink = io.StringIO() yu.ydumps(new_config, sink, trait_help=self.doc) self.log.warning('PRETEND init: %s' % sink.getvalue()) else: with, 'wt', encoding='utf-8') as fout: yu.ydumps(new_config, fout, trait_help=self.doc) self.log.notice("Created a %s config-file '%s' for %i projects: %s", self._template_project.pname, cfgpath, len(self.projects), ''.join('\n - %s: %s' % (p.pname, p.basepath) for p in self.projects))
_status_all_help = """ When true, fetch also all version-tags, otherwise just project version-id(s). """ def _git_desc_without_screams(proj): try: return proj.git_describe() except pvtags.GitVoidError as _: return None
[docs]class StatusCmd(_SubCmd): """ List the versions of project(s). SYNTAX: {cmd_chain} [OPTIONS] [<project>]... """ all = Bool( # noqa: A003 config=True, help=_status_all_help) flags = {('a', 'all'): ({'StatusCmd': {'all': True}}, _status_all_help)} # type: ignore def _describe_projects(self, projects): return [_git_desc_without_screams(p) or p.pname for p in projects] def _fetch_all(self, projects): ## TODO: YAMLable Project (apart from Printable) with metadata Print/header return [{'pname': p.pname, 'basepath': str(p.basepath), 'gitver': _git_desc_without_screams(p), 'history': p.pvtags_history} for p in projects]
[docs] def run(self, *pnames): projects = self.bootstrapp_projects() if pnames: ## TODO: use _filter_projects_by_name() projects = [p for p in projects if p.pname in pnames] ## TODO: extract method to classify pre-populated histories. pvtags.populate_pvtags_history(*projects) if self.all: res = self._fetch_all(projects) else: res = self._describe_projects(projects) if res: return yu.ydumps(res)
[docs]class LogconfCmd(_SubCmd): """Write a logging-configuration file that can filter logs selectively."""
[docs] def run(self, *args): pass
# TODO: Will work when patched: PolyversCmd.config_paths.tag(envvar=CONFIG_VAR_NAME) trc.Application.raise_config_file_errors.tag(config=True) = \ 'Whether failing to load config files should prevent startup.' PolyversCmd.flags = { # type: ignore ## Copied from Application # 'show-config': ({ 'Application': { 'show_config': True, }, },, 'show-config-json': ({ 'Application': { 'show_config_json': True, }, },, ## Consulted by main.init_logging() if in sys.argv. # ('v', 'verbose'): ( {'Spec': {'verbose': True}}, ), ('n', 'dry-run'): ( {'Spec': {'dry_run': True}}, ), ('d', 'debug'): ({ 'Spec': { 'debug': True, }, 'Application': { 'show_config': True, 'raise_config_file_errors': True, }}, ), 'monorepo': ( {'Project': { # type: ignore 'pname': pvtags.MONOREPO, 'pvtag_format': pvlib.pvtag_format, 'pvtag_regex': pvlib.pvtag_regex, }}, """ Select *pvtags* version-scheme, suitable for monorepos hosting multiple sub-projects. """ ## TODO: Conflicting vscheme flags possible! ), 'mono-project': ( {'Project': { # type: ignore 'pname': pvtags.MONO_PROJECT, 'pvtag_format': pvlib.vtag_format, 'pvtag_regex': pvlib.vtag_regex, }}, """ Select *vtags* version-scheme, suitable for repos hosting a single project. - Flag may be required if autodiscovery of version-scheme fails because none (or both) vatgs/pvatgs exist in git repo. - Use `init` cmd not to give this flag on every command run. """ ## TODO: Conflicting vscheme flags possible! ), } PolyversCmd.aliases = { # type: ignore ('C', 'curdir'): 'PolyversCmd.curdir', ('f', 'force'): 'Spec.force', ('p', 'pdata'): 'PolyversCmd.pdata', } += """ Supported tokens: 'fread' : don't stop engraving on file-reading errors. 'fwrite' : don't stop engraving on file-writting errors. 'foverwrite': overwrite existing file. 'glob' : keep-going even if glob-patterns are invalid. 'tag' : replace existing tag. """
[docs]def run(argv=(), cmd_consumer=None, **app_init_kwds): """ Handle some exceptions politely and return the exit-code. :param argv: Cmd-line arguments, nothing assumed if nohing given. :param cmd_consumer: Specify a different main-mup, :class:`mpu.PrintConsumer` by default. See :func:`mpu.pump_cmd()`. """ ## At these early stages, any log cmd-line option # enable DEBUG logging ; later will be set by `baseapp` traits. from .utils import logconfutils as mlu log_level, argv = mlu.log_level_from_argv( argv, start_level=25, # 20=INFO, 25=NOTICE (when patched), 30=WARNING eliminate_quiet=True) log = logging.getLogger('%s.main' % APPNAME) logconf_yaml = osp.join('~', '.%s-logconf.yaml' % APPNAME) mlu.init_logging(level=log_level, logconf=logconf_yaml) ## Imports in separate try-block due to CmdException. # try: from .utils import mainpump as mpu from ._vendor.traitlets import TraitError from .cmdlet.errlog import CollectedErrors except Exception as ex: ## Print stacktrace to stderr and exit-code(-1). return mlu.exit_with_pride(ex, logger=log) try: cmd = PolyversCmd.make_cmd(argv, **app_init_kwds) # @UndefinedVariable return mpu.pump_cmd(cmd.start(), consumer=cmd_consumer) and 0 except (cmdlets.CmdException, TraitError) as ex: log.debug('App exited due to: %r', ex, exc_info=1) ## Suppress stack-trace for "expected" errors but exit-code(1). msg = str(ex) ## Hide some exception-names: # - CmdEx: does not offer anything # - CollectedErrors: msg start with "Collected 2 errors..." if type(ex) not in (cmdlets.CmdException, CollectedErrors): msg = '%s: %s' % (type(ex).__name__, ex) return mlu.exit_with_pride(msg, logger=log) except Exception as ex: ## Log in DEBUG not to see exception x2, but log it anyway, # in case log has been redirected to a file. log.debug('App failed due to: %r', ex, exc_info=1) ## Print stacktrace to stderr and exit-code(-1). return mlu.exit_with_pride(ex, logger=log)