Source code for metomi.rose.macro

# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""
.. testsetup:: *

    import os
    from metomi.rose.macro import *

    def test_cleanup(stuff_to_remove):
        for item in stuff_to_remove:
            try:
                os.remove(item)
            except OSError:
                try:
                    os.rmdir(item)
                except OSError:
                    pass

Module to list or run available custom macros for a configuration.

It also stores macro base classes and macro library functions.

"""

import ast
import copy
from functools import cmp_to_key
import glob
from importlib.machinery import SourceFileLoader
import inspect
import os
import re
import sys
import traceback

import metomi.rose.config
from metomi.rose.config import ConfigNode
import metomi.rose.config_tree
import metomi.rose.formats.namelist
from metomi.rose.opt_parse import RoseOptionParser
import metomi.rose.reporter
import metomi.rose.resource
import metomi.rose.variable

ALLOWED_MACRO_CLASS_METHODS = [
    "transform",
    "validate",
    "downgrade",
    "upgrade",
    "report",
]
ERROR_LOAD_CONFIG_DIR = "{0}: not an application or suite directory.\n"
ERROR_LOAD_MACRO = "Could not load macro {0}: {1}"
ERROR_LOAD_METADATA = "Could not load metadata {0}\n"
ERROR_LOAD_CHOSEN_META_PATH = (
    "Could not find metadata for {0}, using {1}\n"
    "To suppress these warnings run 'rose edit --no-warn version'"
)
ERROR_LOAD_META_PATH = "Could not find metadata for {0}"
ERROR_LOAD_CONF_META_NODE = "Error: could not find meta flag"
ERROR_MACRO_CASE_MISMATCH = (
    "Error: case mismatch; \n {0} does not match {1},"
    " please only use lowercase."
)
ERROR_MACRO_NOT_FOUND = "Error: could not find macro {0}\n"
ERROR_NO_MACRO_HELP = "No help docstring provided, macro \"{0}\"."
ERROR_NO_MACROS = "Please specify a macro name.\n"
ERROR_RETURN_TYPE = "{0}: {1}: invalid returned type: {2}, expect {3}"
ERROR_RETURN_VALUE = "{0}: incorrect return value"
ERROR_RETURN_VALUE_STATE = "{0}: node.state: invalid returned value"
MACRO_DIRNAME = os.path.join(
    os.path.join("lib", "python"), metomi.rose.META_DIR_MACRO
)
ERROR_OUT_DIR_MULTIPLE_APPS = (
    "Cannot specify an output dir when running" " macro over multiple apps."
)
MACRO_EXT = ".py"
MACRO_OUTPUT_HELP = "    # {0}\n"
MACRO_OUTPUT_ID = "[{0}] {1}"
MACRO_OUTPUT_TRANSFORM_CHANGES = "{0}: changes: {1}\n"
MACRO_OUTPUT_VALIDATE_ISSUES = "{0}: issues: {1}\n"
MACRO_OUTPUT_WARNING_ISSUES = "{0}: warnings: {1}\n"
OPT_CONFIG_REPORT = "(opts={0})"
REC_MODIFIER = re.compile(r"\{.+\}")
REC_ID_STRIP_DUPL = re.compile(r"\([^()]+\)")
REC_ID_STRIP = re.compile(r'(?:\{.+\})?(?:\([^()]+\))?$')
REC_ID_ELEMENT = re.compile(r"\(([^()]+)\)$")
REC_ID_SINGLE_ELEMENT = re.compile(r"\((\d+)\)$")
ID_ELEMENT_FORMAT = "{0}({1})"
PROBLEM_ENTRY = "    {0}{1}={2}={3}\n        {4}\n"
PROMPT_ACCEPT_CHANGES = "Accept y/n (default n)? "
PROMPT_OK = "y"
SETTING_ID = "    {0}={1}={2}\n        {3}"
TRANSFORM_METHOD = "transform"
VALIDATE_METHOD = "validate"
REPORT_METHOD = "report"
VERBOSE_LIST = "{0} - ({1}) - {2}"


class MacroFinishNothingEvent(metomi.rose.reporter.Event):

    """Event reported when there have been no problems or changes."""

    LEVEL = metomi.rose.reporter.Event.VV

    def __str__(self):
        return "Configurations OK"


class MacroLoadError(Exception):

    """Raise this error if an exception occurs during macro import."""

    def __str__(self):
        return ERROR_LOAD_MACRO.format(self.args[0], self.args[1])


class MacroNotFoundError(NameError):

    """Raise this error if a macro name cannot be found."""

    def __str__(self):
        return ERROR_MACRO_NOT_FOUND.format(self.args[0])


class MacroTransformDumpEvent(metomi.rose.reporter.Event):

    """Event reported when a transformed configuration is dumped."""

    def __str__(self):
        if self.args[1] is None:
            return "M %s" % self.args[0]
        return "M %s -> %s" % (self.args[0], self.args[1])


class MacroReturnedCorruptConfigError(TypeError):

    """Raise this error if a macro's returned config is corrupt."""

    def __str__(self):
        return "Macro tried to corrupt the config: %s" % self.args[0]


class MacroValidateError(Exception):

    """Raise this error if validation parsing fails."""

    def __init__(self, *args):
        args = list(args)
        for i, arg in enumerate(args):
            if issubclass(type(arg), Exception):
                args[i] = str(type(arg)) + " " + str(arg)
        self.args_string = " ".join([str(a) for a in args])
        super(MacroValidateError, self).__init__()

    def __str__(self):
        return 'Could not perform validation: ' + self.args_string


class MetaConfigFlagMissingError(Exception):

    """Raise this error if there is no meta= flag."""

    def __str__(self):
        return ERROR_LOAD_CONF_META_NODE


[docs]class MacroBase: """Base class for macros for validating or transforming configurations. Synopsis: >>> import metomi.rose.macro ... >>> class SomeValidator(metomi.rose.macro.MacroBase): ... ... '''Important: Add a docstring for your macro like this. ... ... A macro class should implement one of the following methods: ... ... ''' ... ... def validate(self, config, meta_config=None): ... # Some check on config appends to self.reports using ... # self.add_report. ... return self.reports ... ... def transform(self, config, meta_config=None): ... # Some operation on config which calls self.add_report ... # for each change. ... return config, self.reports ... ... def report(self, config, meta_config=None): ... # Perform some analysis of the config but return nothing. ... pass Keyword arguments can be used, ``rose macro`` will prompt the user to provide values for these arguments when the macro is run. >>> def validate(self, config, meta_config=None, answer=None): ... # User will be prompted to provide a value for "answer". ... return self.reports There is a special keyword argument called ``optional_config_name`` which is set to the name of the optional configuration a macro is running on, or ``None`` if only the default configuration is being used. >>> def report(self, config, meta_config=None, ... optional_config_name=None): ... if optional_config_name: ... print('Macro is being run using the "%s" ' ... 'optional configuration' % optional_config_name) """ def __init__(self): self.reports = [] # MacroReport instances for errors or changes def _get_section_option_from_id(self, var_id): """Return a configuration section and option from an id.""" return get_section_option_from_id(var_id) def _get_id_from_section_option(self, section, option): """Return a variable id from a section and option.""" return get_id_from_section_option(section, option) def _sorter(self, rep1, rep2): # Sort [section], [section, option], [section, None] id1 = self._get_id_from_section_option(rep1.section, rep1.option) id2 = self._get_id_from_section_option(rep2.section, rep2.option) if id1 == id2: # This logic replicates output of the deprecated Python2 `cmp` # builtin return (rep1.value > rep2.value) - (rep1.value < rep2.value) return metomi.rose.config.sort_settings(id1, id2) def _load_meta_config( self, config, meta=None, directory=None, config_type=None ): """Return a metadata configuration object.""" if isinstance(meta, metomi.rose.config.ConfigNode): return meta return load_meta_config(config, directory, config_type=config_type)
[docs] def get_metadata_for_config_id(self, setting_id, meta_config): """Return a dict of metadata properties and values for a setting id. Args: setting_id (str): The name of the setting to extract metadata for. meta_config (metomi.rose.config.ConfigNode): Config node containing the metadata to extract from. Return: dict: A dictionary containing metadata options. Example: >>> # Create a rose app. >>> with open('rose-app.conf', 'w+') as app_config: ... _ = app_config.write(''' ... [foo] ... bar=2 ... ''') >>> os.mkdir('meta') >>> with open('meta/rose-meta.conf', 'w+') as meta_config: ... _ = meta_config.write(''' ... [foo=bar] ... values = 1,2,3 ... ''') ... >>> # Load config. >>> app_conf, config_map, meta_config = load_conf_from_file( ... '.', 'rose-app.conf') ... >>> # Extract metadata for foo=bar. >>> get_metadata_for_config_id('foo=bar', meta_config) {'values': '1,2,3', 'id': 'foo=bar'} .. testcleanup:: metomi.rose.macro.MacroBase.get_metadata_for_config_id test_cleanup(['rose-app.conf', 'meta/rose-meta.conf', 'meta']) """ return get_metadata_for_config_id(setting_id, meta_config)
[docs] def get_resource_path(self, filename=''): """Load the resource according to the path of the calling macro. The returned path will be based on the macro location under ``lib/python`` in the metadata directory. If the calling macro is ``lib/python/macro/levels.py``, and the filename is ``rules.json``, the returned path will be ``etc/macro/levels/rules.json``. Args: filename (str): The filename of the resource to request the path to. Return: str: The path to the requested resource. """ last_frame = inspect.getouterframes(inspect.currentframe())[1] macro_path = os.path.abspath(inspect.getfile(last_frame[0])) macro_name = os.path.basename(macro_path).rpartition('.py')[0] macro_root_dir = os.path.dirname(macro_path) library_dir = os.path.dirname(os.path.dirname(macro_root_dir)) root_dir = os.path.dirname(library_dir) # root_dir is the directory of the rose-meta.conf file. etc_path = os.path.join(root_dir, 'etc') resource_path = os.path.join(etc_path, 'macros') resource_path = os.path.join(resource_path, macro_name) resource_path = os.path.join(resource_path, filename) return resource_path
[docs] def pretty_format_config(self, config): """Standardise the keys and values of a config node. Args: config (metomi.rose.config.ConfigNode): The config node to convert. """ pretty_format_config(config)
[docs] def standard_format_config(self, config): """Standardise any degenerate representations e.g. namelist repeats. Args: config (metomi.rose.config.ConfigNode): The config node to convert. """ standard_format_config(config)
[docs] def add_report(self, *args, **kwargs): """Add a metomi.rose.macro.MacroReport. See :class:`metomi.rose.macro.MacroReport` for details of arguments. Examples: >>> # An example validator macro which adds a report to the setting >>> # env=MY_FAVOURITE_STREAM_EDITOR. >>> class My_Macro(MacroBase): ... def validate(self, config, meta_config=None): ... editor_value = config.get( ... ['env', 'MY_FAVOURITE_STREAM_EDITOR']).value ... if editor_value != 'sed': ... self.add_report( ... 'env', # Section ... 'MY_FAVOURITE_STREAM_EDITOR', # Option ... editor_value, # Value ... 'Should be "sed"!') # Message ... return self.reports """ self.reports.append(MacroReport(*args, **kwargs))
class MacroBaseRoseEdit(MacroBase): """This class extends MacroBase to provide a non-ConfigNode API. In the following methods, config_data can be a metomi.rose.config.ConfigNode instance or a dictionary that looks like this: {"sections": {"namelist:foo": metomi.rose.section.Section instance, "env": metomi.rose.section.Section instance}, "variables": {"namelist:foo": [metomi.rose.variable.Variable instance, metomi.rose.variable.Variable instance], "env": [metomi.rose.variable.Variable instance]} } This makes it easy to interface with rose edit, which uses the latter data structure internally. """ def _get_config_sections(self, config_data): """Return all sections within config_data.""" sections = [] if isinstance(config_data, metomi.rose.config.ConfigNode): for key, node in config_data.value.items(): if isinstance(node.value, dict): sections.append(key) if "" not in sections: sections.append("") else: for key in set( config_data["sections"].keys() + config_data["variables"].keys() ): sections.append(key) return sections def _get_config_section_options(self, config_data, section): """Return all options within a section in config_data.""" if isinstance(config_data, metomi.rose.config.ConfigNode): names = [] for keylist, _ in config_data.walk([section]): names.append(keylist[-1]) return names else: return [v.name for v in config_data["variables"].get(section, [])] def _get_config_has_id(self, config_data, id_): """Return whether the config_data contains the id_.""" section, option = self._get_section_option_from_id(id_) if isinstance(config_data, metomi.rose.config.ConfigNode): return config_data.get([section, option]) is not None if option is None: return section in config_data["sections"] return option in [ v.name for v in config_data["variables"].get(section, []) ] def _get_config_id_state(self, config_data, id_): """Return the ConfigNode.STATE_* that applies to id_ or None.""" section, option = self._get_section_option_from_id(id_) if isinstance(config_data, metomi.rose.config.ConfigNode): node = config_data.get([section, option]) if node is None: return None return node.state ignored_reason = None if option is None: if section in config_data["sections"]: ignored_reason = config_data["sections"][ section ].ignored_reason else: for variable in config_data["variables"].get(section, []): if variable.name == option: ignored_reason = variable.ignored_reason break if ignored_reason is None: return None if metomi.rose.variable.IGNORED_BY_USER in ignored_reason: return metomi.rose.config.ConfigNode.STATE_USER_IGNORED if metomi.rose.variable.IGNORED_BY_SYSTEM in ignored_reason: return metomi.rose.config.ConfigNode.STATE_SYST_IGNORED return metomi.rose.config.ConfigNode.STATE_NORMAL def _get_config_id_value(self, config_data, id_): """Return a value (if any) for id_ in the config_data.""" section, option = self._get_section_option_from_id(id_) if option is None: return None if isinstance(config_data, metomi.rose.config.ConfigNode): node = config_data.get([section, option]) if node is None: return None return node.value for variable in config_data["variables"].get(section, []): if variable.name == option: return variable.value return None class MacroValidatorCollection(MacroBase): """Collate several macros into one.""" def __init__(self, *macros): self.macros = macros super(MacroValidatorCollection, self).__init__() def validate(self, config, meta_config): for macro_inst in self.macros: if not hasattr(macro_inst, VALIDATE_METHOD): continue macro_method = getattr(macro_inst, VALIDATE_METHOD) p_list = macro_method(config, meta_config) p_list.sort(key=cmp_to_key(self._sorter)) self.reports += p_list return self.reports class MacroTransformerCollection(MacroBase): """Collate several macros into one.""" def __init__(self, *macros): self.macros = macros super(MacroTransformerCollection, self).__init__() def transform(self, config, meta_config=None): for macro_inst in self.macros: if not hasattr(macro_inst, TRANSFORM_METHOD): continue macro_method = getattr(macro_inst, TRANSFORM_METHOD) config, c_list = macro_method(config, meta_config) c_list.sort(key=cmp_to_key(self._sorter)) self.reports += c_list return config, self.reports
[docs]class MacroReport: """Class to hold information about a macro issue. Arguments: section (str): The name of the section to attach this report to. option (str): The name of the option (within the section) to attach this report to. value (object): The value of the configuration associated with this report. info (str): Text information describing the nature of the report. is_warning (bool): If True then this report will be logged as a warning. Example: >>> report = MacroReport('env', 'WORLD', 'Earth', ... 'World changed to Earth', True) """ def __init__( self, section=None, option=None, value=None, info=None, is_warning=False, ): self.section = section self.option = option self.value = value self.info = info self.is_warning = is_warning def __repr__(self): return ( "<MacroReport section=%s option=%s value=%s info=%s " + "is_warning=%s>" ) % (self.section, self.option, self.value, self.info, self.is_warning) def __eq__(self, other): return self.__hash__() == other.__hash__() def __hash__(self): return hash( (self.section, self.option, self.value, self.info, self.is_warning) )
def add_meta_paths(): """Call add_site_meta_paths and add_env_meta_paths.""" add_site_meta_paths() add_env_meta_paths() def add_site_meta_paths(): """Load any metadata paths specified in a user or site configuration.""" conf = metomi.rose.resource.ResourceLocator.default().get_conf() path = conf.get_value( [metomi.rose.CONFIG_SECT_TOP, metomi.rose.CONFIG_OPT_META_PATH] ) if path is not None: for path in path.split(os.pathsep): path = os.path.expanduser(os.path.expandvars(path)) sys.path.insert(0, os.path.abspath(path)) sys.path.append( str(metomi.rose.resource.ResourceLocator.default().locate('rose-meta')) ) def add_env_meta_paths(): """Load the environment variable ROSE_META_PATH, if defined.""" path = os.environ.get("ROSE_META_PATH") if path is not None: for path in path.split(os.pathsep): path = os.path.expanduser(os.path.expandvars(path)) sys.path.insert(0, os.path.abspath(path)) def add_opt_meta_paths(meta_paths): """Load any metadata paths in a list of ":"-separated strings.""" if meta_paths is not None: meta_paths.reverse() for child_paths in [arg.split(os.pathsep) for arg in meta_paths]: child_paths.reverse() for path in child_paths: path = os.path.expandvars(os.path.expanduser(path)) sys.path.insert(0, os.path.abspath(path)) def get_section_option_from_id(var_id): """Return a configuration section and option from an id.""" section_option = var_id.split(metomi.rose.CONFIG_DELIMITER, 1) if len(section_option) == 1: return var_id, None return section_option def get_id_from_section_option(section, option): """Return a variable id from a section and option.""" if option is None: return section return section + metomi.rose.CONFIG_DELIMITER + option def load_meta_path( config=None, directory=None, is_upgrade=False, locator=None, opt_meta_paths=None, no_warn=None, ): """Retrieve the path to the configuration metadata directory. Arguments: config - a rose config, perhaps with a meta= or project= flag directory - the directory of the rose config file is_upgrade - if True, load the path in an upgrade-specific way locator - a metomi.rose.resource.ResourceLocator instance. Returns the path to(or None) and a warning message (or None). """ if config is None: config = metomi.rose.config.ConfigNode() if no_warn is None: no_warn = [] warning = None if directory is not None and not is_upgrade: config_meta_dir = os.path.join(directory, metomi.rose.CONFIG_META_DIR) meta_file = os.path.join(config_meta_dir, metomi.rose.META_CONFIG_NAME) if os.path.isfile(meta_file): return config_meta_dir, warning if locator is None: if opt_meta_paths: paths = opt_meta_paths + sys.path else: paths = sys.path locator = metomi.rose.resource.ResourceLocator(paths=paths) opt_node = config.get( [metomi.rose.CONFIG_SECT_TOP, metomi.rose.CONFIG_OPT_META_TYPE], no_ignore=True, ) ignore_meta_error = opt_node is None if opt_node is None: opt_node = config.get( [metomi.rose.CONFIG_SECT_TOP, metomi.rose.CONFIG_OPT_PROJECT], no_ignore=True, ) if opt_node is None or not opt_node.value: meta_keys = ["rose-all"] else: key = str(opt_node.value) split_key = key.split('/') if len(split_key) == 1: key = '/'.join([key, metomi.rose.META_DEFAULT_VN_DIR]) meta_keys = [key] split_key = split_key if len(split_key) == 1 else split_key[:-1] if is_upgrade: meta_keys = ['/'.join(split_key)] else: default_key = '/'.join( split_key + [metomi.rose.META_DEFAULT_VN_DIR] ) if default_key != key: meta_keys.append(default_key) for i, meta_key in enumerate(meta_keys): path = os.path.join(meta_key, metomi.rose.META_CONFIG_NAME) if is_upgrade: path = meta_key try: meta_path = str(locator.locate(path)) except metomi.rose.resource.ResourceError: continue else: if not (ignore_meta_error or 'version' in no_warn) and i > 0: warning = ERROR_LOAD_CHOSEN_META_PATH.format( meta_keys[0], meta_keys[i] ) if is_upgrade: return meta_path, warning return os.path.dirname(meta_path), warning if not ignore_meta_error: warning = ERROR_LOAD_META_PATH.format(meta_keys[0]) return None, warning def load_meta_config_tree( config, directory=None, config_type=None, error_handler=None, ignore_meta_error=False, opt_meta_paths=None, no_warn=None, ): """Return the metadata config tree for a configuration.""" if opt_meta_paths: paths = opt_meta_paths + sys.path else: paths = sys.path if error_handler is None: error_handler = _report_error meta_list = ["rose-all/" + metomi.rose.META_CONFIG_NAME] if config_type is not None: default_meta_dir = config_type.replace(".", "-") meta_list.append(default_meta_dir + "/" + metomi.rose.META_CONFIG_NAME) config_meta_path, warning = load_meta_path( config, directory, opt_meta_paths=opt_meta_paths, no_warn=no_warn ) if warning is not None and not ignore_meta_error: error_handler(text=warning) locator = metomi.rose.resource.ResourceLocator(paths=paths) opt_node = config.get( [metomi.rose.CONFIG_SECT_TOP, metomi.rose.CONFIG_OPT_META_TYPE], no_ignore=True, ) ignore_meta_error = ignore_meta_error or opt_node is None meta_config_tree = None meta_config = metomi.rose.config.ConfigNode() for meta_key in meta_list: try: meta_path = str(locator.locate(meta_key)) except metomi.rose.resource.ResourceError: if not ignore_meta_error: error_handler(text=ERROR_LOAD_META_PATH.format(meta_key)) continue try: meta_config_tree = metomi.rose.config_tree.ConfigTreeLoader().load( os.path.dirname(meta_path), metomi.rose.META_CONFIG_NAME, conf_dir_paths=list(paths), conf_node=meta_config, ) except metomi.rose.config.ConfigSyntaxError as exc: error_handler(text=str(exc)) else: meta_config = meta_config_tree.node if config_meta_path is None: return meta_config_tree # Try and get a proper non-default meta config tree. try: meta_config_tree = metomi.rose.config_tree.ConfigTreeLoader().load( config_meta_path, metomi.rose.META_CONFIG_NAME, conf_dir_paths=list(paths), ) except metomi.rose.resource.ResourceError: if not ignore_meta_error: error_handler(text=ERROR_LOAD_META_PATH.format(meta_list)) except metomi.rose.config.ConfigSyntaxError as exc: error_handler(text=str(exc)) meta_config += meta_config_tree.node meta_config_tree.node = meta_config return meta_config_tree def load_meta_config( config, directory=None, config_type=None, error_handler=None, ignore_meta_error=False, ): """Return the metadata config for a configuration.""" config_tree = load_meta_config_tree( config, directory=directory, config_type=config_type, error_handler=error_handler, ignore_meta_error=ignore_meta_error, ) return config_tree.node def load_meta_macro_modules(meta_files, module_prefix=None): """Import metadata macros and return them in an array.""" modules = [] for meta_file in meta_files: meta_dir = os.path.dirname(meta_file) if not meta_dir.endswith(MACRO_DIRNAME) or not meta_file.endswith( MACRO_EXT ): continue sys.path.insert(0, meta_dir) macro_name = os.path.basename(meta_file).rpartition(MACRO_EXT)[0] if module_prefix is None: as_name = macro_name else: as_name = module_prefix + macro_name try: modules.append(SourceFileLoader(as_name, meta_file).load_module()) except Exception: metomi.rose.reporter.Reporter()( MacroLoadError(meta_file, traceback.format_exc()) ) sys.path.pop(0) modules.sort(key=str) return modules def get_macro_class_methods(macro_modules): """Return all macro methods in the modules.""" macro_methods = [] for macro_module in macro_modules: macro_name = macro_module.__name__ contents = inspect.getmembers(macro_module) for obj_name, obj in contents: if not inspect.isclass(obj): continue for att_name in ALLOWED_MACRO_CLASS_METHODS: if hasattr(obj, att_name) and callable(getattr(obj, att_name)): doc_string = obj.__doc__ macro_methods.append( (macro_name, obj_name, att_name, doc_string) ) macro_methods.sort(key=lambda x: x[1]) macro_methods.sort(key=lambda x: x[1]) # This logic replicates output of the deprecated Python2 `cmp` builtin macro_methods.sort( key=cmp_to_key(lambda x, y: (y[2] > x[2]) - (y[2] < x[2])) ) return macro_methods def get_macros_for_config( config=None, config_directory=None, return_modules=False, include_system=False, include_custom=True, no_warn=False, ): """Driver function to return macro names for a config object. kwargs: config - The config to retrieve macros for as a metomi.rose.config.ConfigNode config_directory - The directory that the config file is located in. return_modules - If true then a list of macro modules is also returned. include_system - Include default rose macros? include_custom - Include non-default rose macros? no_warn - Output metadata warnings? """ if config is None: config = ConfigNode() meta_config_tree = load_meta_config_tree( config, directory=config_directory, no_warn=no_warn ) if meta_config_tree is None: return [] modules = [] if include_custom: # Suite specified macros. meta_filepaths = [ os.path.join(v, k) for k, v in meta_config_tree.files.items() ] modules.extend(load_meta_macro_modules(meta_filepaths)) if include_system: # Default macros. import metomi.rose.macros # Done to avoid cyclic top-level imports. modules.append(metomi.rose.macros) if return_modules: return get_macro_class_methods(modules), modules return get_macro_class_methods(modules) def check_config_integrity(app_config): """Verify that the configuration is sane - return an error otherwise.""" try: keys_and_nodes = list(app_config.walk()) except Exception as exc: return MacroReturnedCorruptConfigError(str(exc)) keys_and_nodes.insert(0, ([], app_config)) for keys, node in keys_and_nodes: if not isinstance(node, metomi.rose.config.ConfigNode): return MacroReturnedCorruptConfigError( ERROR_RETURN_TYPE.format( node, "node", type(node), "rose.config.ConfigNode" ) ) if not isinstance(node.value, dict) and not isinstance( node.value, str ): return MacroReturnedCorruptConfigError( ERROR_RETURN_TYPE.format( node.value, "node.value", type(node.value), "dict, basestring", ) ) if not isinstance(node.state, str): return MacroReturnedCorruptConfigError( ERROR_RETURN_TYPE.format( node.state, "node.state", type(node.state), "basestring" ) ) if node.state not in [ metomi.rose.config.ConfigNode.STATE_NORMAL, metomi.rose.config.ConfigNode.STATE_SYST_IGNORED, metomi.rose.config.ConfigNode.STATE_USER_IGNORED, ]: return MacroReturnedCorruptConfigError( ERROR_RETURN_VALUE_STATE.format(node.state) ) if not isinstance(node.comments, list): return MacroReturnedCorruptConfigError( ERROR_RETURN_TYPE.format( node.comments, "node.comments", type(node.comments), "list" ) ) for comment in node.comments: if not isinstance(comment, str): return MacroReturnedCorruptConfigError( ERROR_RETURN_TYPE.format( comment, "comment", type(comment), "basestring" ) ) for key in keys: if not isinstance(key, str): return MacroReturnedCorruptConfigError( ERROR_RETURN_TYPE.format( key, "key", type(key), "basestring" ) ) def report_config( app_config, meta_config, run_macro_list, modules, macro_info_tuples, opt_non_interactive=False, optional_config_name=None, optional_values=None, validate_mode=True, ): """Run report/validator custom macros on the config and return problems (in the case of validator macros).""" if optional_values is None: optional_values = {} macro_problem_map = {} if validate_mode: macro_method = VALIDATE_METHOD else: macro_method = REPORT_METHOD for module_name, class_name, method, _ in macro_info_tuples: macro_name = ".".join([module_name, class_name]) if macro_name in run_macro_list and method == macro_method: for module in modules: if module.__name__ == module_name: macro_inst = getattr(module, class_name)() macro_meth = getattr(macro_inst, method) break res = {} if not opt_non_interactive: arglist = inspect.getfullargspec(macro_meth).args defaultlist = inspect.getfullargspec(macro_meth).defaults optionals = {} while defaultlist is not None and len(defaultlist) > 0: if arglist[-1] not in ["self", "config", "meta_config"]: optionals[arglist[-1]] = defaultlist[-1] arglist = arglist[0:-1] defaultlist = defaultlist[0:-1] else: break if optionals: update_optional_values( res, optionals, optional_values, optional_config_name ) if validate_mode: problem_list = macro_meth(app_config, meta_config, **res) if not isinstance(problem_list, list): raise ValueError(ERROR_RETURN_VALUE.format(macro_name)) if problem_list: macro_problem_map.update({macro_name: problem_list}) else: macro_meth(app_config, meta_config, **res) if validate_mode: return macro_problem_map def update_optional_values( res, optionals, optional_values, optional_config_name ): """Copy any relevant parameters into the 'res' dict.""" if "optional_config_name" in optionals: res["optional_config_name"] = optional_config_name del optionals["optional_config_name"] for key in set(optionals) & set(optional_values): optionals[key] = optional_values[key] res[key] = optional_values[key] res.update(get_user_values(optionals, res.keys())) optional_values.update(res) def transform_config( config, meta_config, transformer_macro, modules, macro_info_tuples, opt_non_interactive=False, optional_config_name=None, optional_values=None, ): """Run transformer custom macros on the config and return problems.""" if optional_values is None: optional_values = {} for module_name, class_name, method, _ in macro_info_tuples: if method != TRANSFORM_METHOD: continue macro_name = ".".join([module_name, class_name]) if macro_name != transformer_macro: continue for module in modules: if module.__name__ == module_name: macro_inst = getattr(module, class_name)() macro_method = getattr(macro_inst, method) break res = {} if not opt_non_interactive: arglist = inspect.getfullargspec(macro_method).args defaultlist = inspect.getfullargspec(macro_method).defaults optionals = {} while defaultlist is not None and len(defaultlist) > 0: if arglist[-1] not in ["self", "config", "meta_config"]: optionals[arglist[-1]] = defaultlist[-1] arglist = arglist[0:-1] defaultlist = defaultlist[0:-1] else: break if optionals: update_optional_values( res, optionals, optional_values, optional_config_name ) return macro_method(config, meta_config, **res) return config, [] def pretty_format_config(config, ignore_error=False): """Standardise the keys and values of a config node. Args: config (metomi.rose.config.ConfigNode): The Config node to convert. """ for s_key, s_node in config.value.items(): scheme = s_key if ":" in scheme: scheme = scheme.split(":", 1)[0] try: scheme_module = getattr(metomi.rose.formats, scheme) pretty_format_keys = getattr(scheme_module, "pretty_format_keys") pretty_format_value = getattr(scheme_module, "pretty_format_value") except AttributeError: continue for keylist, node in list(s_node.walk()): # FIXME: Surely, only the scheme knows how to split its array? values = metomi.rose.variable.array_split(node.value, ",") node.value = pretty_format_value(values) new_keylist = pretty_format_keys(keylist) if new_keylist != keylist: s_node.unset(keylist) s_node.set(new_keylist, node.value, node.state, node.comments) if ignore_error is False: _report_error( text=ERROR_MACRO_CASE_MISMATCH.format( keylist[1], new_keylist[1] ) ) sys.exit(0) def standard_format_config(config): """Standardise any degenerate representations e.g. namelist repeats. Args: config (metomi.rose.config.ConfigNode): The config node to convert. """ for keylist, node in config.walk(): if len(keylist) == 2: scheme = keylist[0] if ":" in scheme: scheme = scheme.split(":", 1)[0] try: scheme_module = getattr(metomi.rose.formats, scheme) standard_format = getattr(scheme_module, "standard_format") except AttributeError: continue values = metomi.rose.variable.array_split(node.value, ",") node.value = standard_format(values) def get_metadata_for_config_id(setting_id, meta_config): """Return a dict of metadata properties and values for a setting id. Args: setting_id (str): The name of the setting to extract metadata for. meta_config (metomi.rose.config.ConfigNode): Config node containing the metadata to extract from. Return: dict: A dictionary containing metadata options. Example: >>> # Create a rose app. >>> with open('rose-app.conf', 'w+') as app_config: ... _ = app_config.write(''' ... [foo] ... bar=2 ... ''') >>> os.mkdir('meta') >>> with open('meta/rose-meta.conf', 'w+') as meta_config: ... _ = meta_config.write(''' ... [foo=bar] ... values = 1,2,3 ... ''') ... >>> # Load config. >>> app_conf, config_map, meta_config = load_conf_from_file( ... '.', 'rose-app.conf') ... >>> # Extract metadata for foo=bar. >>> get_metadata_for_config_id('foo=bar', meta_config) {'values': '1,2,3', 'id': 'foo=bar'} .. testcleanup:: metomi.rose.macro.get_metadata_for_config_id test_cleanup(['rose-app.conf', 'meta/rose-meta.conf', 'meta']) """ metadata = {} if metomi.rose.CONFIG_DELIMITER in setting_id: option = setting_id.split(metomi.rose.CONFIG_DELIMITER, 1)[1] search_option = REC_ID_STRIP_DUPL.sub("", option) else: option = None search_option = None search_id = REC_ID_STRIP_DUPL.sub("", setting_id) no_modifier_id = REC_MODIFIER.sub("", search_id) if no_modifier_id != search_id: # There is a modifier e.g. namelist:foo{bar}. node = meta_config.get([no_modifier_id], no_ignore=True) # Get metadata for namelist:foo if node is not None: for opt, opt_node in node.value.items(): if not opt_node.is_ignored(): metadata.update({opt: opt_node.value}) if option is None and metomi.rose.META_PROP_TITLE in metadata: # Handle section modifier titles modifier = search_id.replace(no_modifier_id, "") metadata[metomi.rose.META_PROP_TITLE] += " " + modifier if ( setting_id != search_id and metomi.rose.META_PROP_DUPLICATE in metadata ): # foo{bar}(1) cannot inherit duplicate from foo. metadata.pop(metomi.rose.META_PROP_DUPLICATE) node = meta_config.get([search_id], no_ignore=True) # If modifier, get metadata for namelist:foo{bar} if node is not None: for opt, opt_node in node.value.items(): if not opt_node.is_ignored(): metadata.update({opt: opt_node.value}) if metomi.rose.META_PROP_TITLE in metadata: # Handle duplicate (indexed) settings sharing a title if option is None: if search_id != setting_id: # Handle duplicate sections titles metadata.pop(metomi.rose.META_PROP_TITLE) elif search_option != option: # Handle duplicate options titles index = option.replace(search_option, "") metadata[metomi.rose.META_PROP_TITLE] += " " + index if ( metomi.rose.META_PROP_LENGTH in metadata and option is not None and search_option != option and REC_ID_SINGLE_ELEMENT.search(option) ): # Option is a single element in an array, not a slice. metadata.pop(metomi.rose.META_PROP_LENGTH) metadata.update({'id': setting_id}) return metadata def run_macros( config_map, meta_config, config_name, macro_names, opt_conf_dir=None, opt_fix=False, opt_non_interactive=False, opt_output_dir=None, opt_validate_all=False, opt_transform_all=False, verbosity=None, no_warn=False, default_only=False, ): """Run standard or custom macros for a configuration.""" reporter = metomi.rose.reporter.Reporter(verbosity) macro_tuples, modules = get_macros_for_config( config_map[None], opt_conf_dir, return_modules=True, include_system=True, include_custom=not default_only, no_warn=no_warn, ) # Add all macros to the run list as specified. methods = [] if opt_validate_all: methods.append(VALIDATE_METHOD) if opt_transform_all or opt_fix: methods.append(TRANSFORM_METHOD) macros_by_type = {} for macro_method in methods: macros_by_type[macro_method] = [] for module_name, class_name, method, _ in macro_tuples: if opt_fix and not opt_transform_all: # Only include internal transformer macros for # metomi.rose macro --fix. if module_name != metomi.rose.macros.__name__: continue if method == macro_method: macro_name = ".".join([module_name, class_name]) macros_by_type[macro_method].append(macro_name) if not macros_by_type[macro_method]: return True # List all macros if none are given. if not macro_names and not [ macros_by_type[method] for method in macros_by_type ]: for module_name, class_name, method, help_ in macro_tuples: macro_name = ".".join([module_name, class_name]) macro_id = MACRO_OUTPUT_ID.format(method.upper()[0], macro_name) if help_: reporter(macro_id + "\n", prefix="") for help_line in help_.split("\n"): reporter( MACRO_OUTPUT_HELP.format(help_line), level=reporter.V, prefix="", ) else: # No "help" docstring provided in macro. reporter( ERROR_NO_MACRO_HELP.format(macro_name), level=reporter.FAIL, prefix=reporter.PREFIX_FAIL, ) return False return True # Categorise macros given as arguments. macros_not_found = [m for m in macro_names] for module_name, class_name, method, _ in macro_tuples: this_macro_name = ".".join([module_name, class_name]) this_macro_method_name = ".".join([this_macro_name, method]) if this_macro_name in macro_names: macros_by_type.setdefault(method, []) macros_by_type[method].append(this_macro_name) if this_macro_name in macros_not_found: macros_not_found.remove(this_macro_name) elif this_macro_method_name in macro_names: macros_by_type.setdefault(method, []) macros_by_type[method].append(this_macro_name) if this_macro_method_name in macros_not_found: macros_not_found.remove(this_macro_method_name) for macro_name in macros_not_found: reporter(MacroNotFoundError(macro_name)) if macros_not_found: return False ret_code = 0 # Run any validator macros. if VALIDATE_METHOD in macros_by_type: new_combined_config_map = combine_opt_config_map(config_map) macro_config_problems_map = {} optional_values = {} for conf_key, config in new_combined_config_map.items(): config_problems_map = report_config( config, meta_config, macros_by_type[VALIDATE_METHOD], modules, macro_tuples, opt_non_interactive, optional_config_name=conf_key, optional_values=optional_values, validate_mode=True, ) if config_problems_map: ret_code = 1 for macro, problem_list in config_problems_map.items(): macro_config_problems_map.setdefault(macro, {}) problem_list.sort(key=cmp_to_key(report_sort)) macro_config_problems_map[macro][conf_key] = problem_list problem_macros = list(macro_config_problems_map) problem_macros.sort() for macro_name in problem_macros: config_problems_map = macro_config_problems_map[macro_name] method_id = VALIDATE_METHOD.upper()[0] macro_id = MACRO_OUTPUT_ID.format(method_id, macro_name) reporter( get_reports_as_text( config_problems_map, macro_id, is_from_transform=False ), level=reporter.V, kind=reporter.KIND_ERR, prefix="", ) # Run any report macros. if REPORT_METHOD in macros_by_type: new_combined_config_map = combine_opt_config_map(config_map) optional_values = {} for conf_key, config in new_combined_config_map.items(): report_config( config, meta_config, macros_by_type[REPORT_METHOD], modules, macro_tuples, opt_non_interactive, optional_config_name=conf_key, optional_values=optional_values, validate_mode=False, ) # Run any transform macros. no_changes = True if TRANSFORM_METHOD in macros_by_type: no_changes = no_changes and _run_transform_macros( macros_by_type[TRANSFORM_METHOD], config_name, config_map, meta_config, modules, macro_tuples, opt_non_interactive=opt_non_interactive, opt_conf_dir=opt_conf_dir, opt_output_dir=opt_output_dir, reporter=reporter, ) if not ret_code and no_changes: reporter(MacroFinishNothingEvent()) return ret_code == 0 def report_sort(report1, report2): """Sort MacroReport objects by section and option.""" sect1 = report1.section sect2 = report2.section if sect1 == sect2: opt1 = report1.option opt2 = report2.option if opt1 is None or opt2 is None: # This logic replicates output of the deprecated Python2 `cmp` # builtin return (str(opt1) > str(opt2)) - (str(opt1) < str(opt2)) return metomi.rose.config.sort_settings(opt1, opt2) return metomi.rose.config.sort_settings(sect1, sect2) def get_reports_as_text(config_reports_map, macro_id, is_from_transform=False): """Translate reports into nicely formatted text.""" text = "" config_warnings_list = [] config_issues_list = [] main_reports = set(config_reports_map.get(None, [])) conf_keys = list(config_reports_map) conf_keys = sorted(conf_keys, key=lambda x: x is not None) for conf_key in conf_keys: reports = config_reports_map[conf_key] for rep in reports: # MacroReport instance if conf_key is not None and rep in main_reports: # Don't repeat reports about the main configuration. continue if rep.is_warning: config_warnings_list.append((conf_key, rep)) else: config_issues_list.append((conf_key, rep)) if is_from_transform: header = MACRO_OUTPUT_TRANSFORM_CHANGES else: header = MACRO_OUTPUT_VALIDATE_ISSUES header = header.format(macro_id, len(config_issues_list)) text = header for origin, rep in config_issues_list: origin_label = get_config_label(origin) out = PROBLEM_ENTRY.format( origin_label, rep.section, rep.option, rep.value, rep.info ) text += out if config_warnings_list: header = MACRO_OUTPUT_WARNING_ISSUES header = header.format(macro_id, len(config_warnings_list)) text += header for origin, rep in config_warnings_list: origin_label = get_config_label(origin) out = PROBLEM_ENTRY.format( origin_label, rep.section, rep.option, rep.value, rep.info ) text += out return text def get_config_label(config_key): """Return an output-suitable representation of the config_key.""" if not config_key: return "" return OPT_CONFIG_REPORT.format(config_key) def handle_transform( config_map, new_config_map, change_map, macro_id, opt_conf_dir, opt_output_dir, opt_non_interactive, reporter, ): """Prompt the user to go ahead with macro changes and dump the output.""" has_changes = False for change_list in change_map.values(): for report in change_list: if not report.is_warning: has_changes = True break if has_changes: break reporter( get_reports_as_text(change_map, macro_id, is_from_transform=True), level=reporter.V, prefix="", ) if has_changes and (opt_non_interactive or _get_user_accept()): for conf_key, config in new_config_map.items(): dump_config( config, opt_conf_dir, opt_output_dir, conf_key=conf_key ) if reporter is not None: reporter( MacroTransformDumpEvent(opt_conf_dir, opt_output_dir), level=reporter.VV, ) return True return False def combine_opt_config_map(config_map): """Combine optional configurations with a main configuration.""" new_combined_config_map = {} main_config = config_map[None] for conf_key, config in config_map.items(): if conf_key is None: new_combined_config_map[None] = copy.deepcopy(config) continue new_config = copy.deepcopy(main_config) for keylist, subnode in config.walk(): old_subnode = new_config.get(keylist) if ( isinstance(subnode.value, dict) and old_subnode is not None and isinstance(old_subnode.value, dict) ): old_subnode.state = subnode.state old_subnode.comments = subnode.comments else: new_config.set( keylist, value=copy.deepcopy(subnode.value), state=subnode.state, comments=subnode.comments, ) new_combined_config_map[conf_key] = new_config return new_combined_config_map def _run_transform_macros( macros, config_name, config_map, meta_config, modules, macro_tuples, opt_non_interactive=False, opt_conf_dir=None, opt_output_dir=None, reporter=None, ): no_changes = True combined_config_map = combine_opt_config_map(config_map) optional_values = {} for transformer_macro in macros: macro_function = lambda conf, meta, opt: transform_config( conf, meta, transformer_macro, modules, macro_tuples, opt_non_interactive, optional_config_name=opt, optional_values=optional_values, ) new_config_map, changes_map = apply_macro_to_config_map( combined_config_map, meta_config, macro_function, macro_name=transformer_macro, ) method_id = TRANSFORM_METHOD.upper()[0] macro_id = MACRO_OUTPUT_ID.format(method_id, transformer_macro) if handle_transform( config_map, new_config_map, changes_map, macro_id, opt_conf_dir, opt_output_dir, opt_non_interactive, reporter, ): combined_config_map = new_config_map no_changes = False return no_changes def apply_macro_to_config_map( config_map, meta_config, macro_function, macro_name=None ): """Apply a transform macro function to a config_map.""" new_config_map = {} changes_map = {} conf_keys = list(config_map) conf_keys = sorted(conf_keys, key=lambda x: x is not None) for conf_key in conf_keys: config = config_map[conf_key] macro_config = copy.deepcopy(config) return_value = macro_function(macro_config, meta_config, conf_key) err_bad_return_value = ERROR_RETURN_VALUE.format(macro_name) if not isinstance(return_value, tuple) or len(return_value) != 2: raise ValueError(err_bad_return_value) new_config, change_list = return_value if not isinstance( new_config, metomi.rose.config.ConfigNode ) or not isinstance(change_list, list): raise ValueError(err_bad_return_value) exception = check_config_integrity(new_config) if exception is not None: raise exception changes_map[conf_key] = change_list if conf_key is None: # Always the first item. new_config_map[conf_key] = new_config else: diff = new_config - new_config_map[None] new_opt_config = diff.get_as_opt_config() new_config_map[conf_key] = new_opt_config return new_config_map, changes_map def _get_user_accept(): try: user_input = input(PROMPT_ACCEPT_CHANGES) except EOFError: user_allowed_changes = False else: user_allowed_changes = user_input == PROMPT_OK return user_allowed_changes def get_user_values(options, ignore=None): if ignore is None: ignore = [] for key, val in options.items(): if key in ignore: continue entered = False while not entered: try: user_input = input( "Value for " + str(key) + " (default " + str(val) + "): " ) except EOFError: user_input = "" entered = True if len(user_input) > 0: try: options[key] = ast.literal_eval(user_input) entered = True except (SyntaxError, ValueError): metomi.rose.reporter.Reporter()( "Invalid entry: Input should be a valid python " "value.\nNote that strings should be quoted. " "Please try again:\n", kind=metomi.rose.reporter.Reporter.KIND_ERR, level=metomi.rose.reporter.Reporter.FAIL, ) else: entered = True return options def dump_config( config, opt_conf_dir, opt_output_dir=None, conf_key=None, name=metomi.rose.SUB_CONFIG_NAME, ): """Dump the config in a standard form.""" config = copy.deepcopy(config) pretty_format_config(config) if opt_output_dir is None: directory = opt_conf_dir else: directory = opt_output_dir if conf_key is None: target_path = os.path.join(directory, name) else: source_root, source_ext = os.path.splitext(name) base = source_root + "-" + conf_key + source_ext target_path = os.path.join( directory, metomi.rose.config.OPT_CONFIG_DIR, base ) metomi.rose.config.dump(config, target_path) def load_conf_from_file(conf_dir, config_file_path, mode="macro"): """Loads config data from the file config_file_path.""" is_info_config = ( os.path.basename(config_file_path) == metomi.rose.INFO_CONFIG_NAME ) optional_keys = [] optional_dir = os.path.join(conf_dir, metomi.rose.config.OPT_CONFIG_DIR) optional_glob = os.path.join( optional_dir, metomi.rose.GLOB_OPT_CONFIG_FILE ) for path in glob.glob(optional_glob): filename = os.path.basename(path) # filename is a null string if path is to a directory. result = re.match(metomi.rose.RE_OPT_CONFIG_FILE, filename) if not result: continue optional_keys.append(result.group(1)) # Load the configuration and the metadata macros. config_loader = metomi.rose.config.ConfigLoader() if is_info_config: optional_keys = None app_config, config_map = config_loader.load_with_opts( config_file_path, more_keys=optional_keys, return_config_map=True ) standard_format_config(app_config) for _, config in config_map.items(): standard_format_config(config) # Load meta config if it exists. meta_config = metomi.rose.config.ConfigNode() meta_path, warning = load_meta_path(app_config, conf_dir) if meta_path is None and not is_info_config: if mode == "macro": text = ERROR_LOAD_METADATA.format("") if warning: text = warning metomi.rose.reporter.Reporter()( text, kind=metomi.rose.reporter.Reporter.KIND_ERR, level=metomi.rose.reporter.Reporter.FAIL, ) return None else: meta_config = load_meta_config( app_config, directory=conf_dir, config_type=os.path.basename(config_file_path), ignore_meta_error=True, ) return app_config, config_map, meta_config def parse_macro_args(): """Parse options/arguments for rose macro and upgrade.""" opt_parser = RoseOptionParser( usage='rose macro [OPTIONS] [MACRO_NAME ...]', description=''' List or run macros associated with a suite or application. Macros are listed/run according to the config dir (`$PWD` unless `--config=DIR` is set): * If the config dir is an app directory (or is within an app directory) macros will be listed/run for the `rose-app.conf` file of that app. * Otherwise macros will be listed/run for the `rose-suite.conf`, `rose-suite.info` and (unless `--suite-only` is set) all `rose-app.conf` files. If a configuration contains optional configurations: * For validator macros, validate the main configuration, then validate each main + optional configuration in turn. * For transform macros, transform the main configuration, then transform each main + optional configuration, recreating each optional configuration as the diff vs the transformed main. ''', epilog=''' ARGUMENTS MACRO_NAME ... A list of macro names to run. If no macro names are specified and `--fix`, `--validate` are not used, list all available macros. Otherwise, run the specified macro names. ENVIRONMENT VARIABLES optional ROSE_META_PATH Prepend `$ROSE_META_PATH` to the metadata search path. ''' ) opt_parser.add_my_options( "conf_dir", "meta_path", "non_interactive", "output_dir", "fix", "validate_all", "no_warn", "suite_only", "transform_all", ) opt_parser.modify_option( 'output_dir', help=( 'The location of the output directory.' '\nOnly meaningful if there is at least one transformer in the' 'argument list.' ), ) opts, args = opt_parser.parse_args() if opts.validate_all and opts.output_dir: sys.stderr.write(opt_parser.get_usage()) return None if opts.conf_dir is None: opts.conf_dir = os.getcwd() opts.conf_dir = os.path.abspath(opts.conf_dir) if opts.output_dir is not None: opts.output_dir = os.path.abspath(opts.output_dir) return opts, args def _report_error(exception=None, text=""): """Report an error via metomi.rose.reporter utilities.""" if text: text += "\n" if exception is not None: text += type(exception).__name__ + ": " + str(exception) + "\n" metomi.rose.reporter.Reporter()( text + "\n", kind=metomi.rose.reporter.Reporter.KIND_ERR, level=metomi.rose.reporter.Reporter.FAIL, ) def scan_rose_directory(conf_dir, suite_only=False): """Returns a list of rose config files found within the given conf_dir. * If the conf_dir is an application directory then return only the application configuration file * If the conf_dir is within the suite directory but above any application directory then return all application configs along with the suite and info configs. * Return None otherwise. Arguments: conf_dir - The directory to scan. suite_only - If True only return suite and info config files. """ path = conf_dir while True: lstdir = set(os.listdir(path)) if metomi.rose.TOP_CONFIG_NAME in lstdir: # We are in the suite directory. confs = [] if not suite_only: # Add app/*/rose-app.conf files. confs = sorted( glob.glob( os.path.join( path, metomi.rose.SUB_CONFIGS_DIR, '*', metomi.rose.SUB_CONFIG_NAME, ) ) ) # Add metomi.rose-suite.conf file. confs.append(os.path.join(path, metomi.rose.TOP_CONFIG_NAME)) # Add metomi.rose-suite.info file. if metomi.rose.INFO_CONFIG_NAME in lstdir: confs.append(os.path.join(path, metomi.rose.INFO_CONFIG_NAME)) return confs elif not suite_only and metomi.rose.SUB_CONFIG_NAME in lstdir: # We are in an app directory. Return only that app. return [os.path.join(path, metomi.rose.SUB_CONFIG_NAME)] # Go up a directory. path = os.path.dirname(path) if path == os.path.dirname(path): # We don't support suites located at the root! break return None def main(): """Run metomi.rose macro.""" reporter = metomi.rose.reporter.Reporter() add_meta_paths() opts, args = parse_macro_args() # Get list of apps to evaluate. confs = scan_rose_directory(opts.conf_dir, suite_only=opts.suite_only) # Fail if no config files could be found. if not confs: reporter( ERROR_LOAD_CONFIG_DIR.format(opts.conf_dir), kind=metomi.rose.reporter.Reporter.KIND_ERR, level=metomi.rose.reporter.Reporter.FAIL, ) sys.exit(1) # Fail if --output-dir specified and multiple config files found. if len(confs) > 1 and opts.output_dir: reporter( ERROR_OUT_DIR_MULTIPLE_APPS, kind=metomi.rose.reporter.Reporter.KIND_ERR, level=metomi.rose.reporter.Reporter.FAIL, ) sys.exit(1) # Path manipulation. add_opt_meta_paths(opts.meta_path) # Run macros for each config. verbosity = 1 + opts.verbosity - opts.quietness ret = [True] for config_file_path in confs: # Macro info. conf_dir = os.path.dirname(config_file_path) cur_conf_type = os.path.basename(config_file_path) config_name = os.path.basename(conf_dir) os.chdir(conf_dir) # Load config. try: _, config_map, meta_config = load_conf_from_file( conf_dir, config_file_path ) except TypeError: sys.exit(1) # Report which config we are currently working on. if len(confs) > 1: if cur_conf_type == metomi.rose.SUB_CONFIG_NAME: reporter( os.path.join( metomi.rose.SUB_CONFIGS_DIR, config_name, cur_conf_type ) ) else: reporter(cur_conf_type) sys.stdout.flush() # Run macros. ret.append( run_macros( config_map, meta_config, config_name, list(args), conf_dir, opts.fix, opts.non_interactive, opts.output_dir, opts.validate_all, opts.transform_all, verbosity, no_warn=opts.no_warn, default_only=cur_conf_type == metomi.rose.INFO_CONFIG_NAME, ) ) # Fail if any macro failed. sys.exit(0 if all(ret) else 1) if __name__ == "__main__": main()