# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Builtin application: "rose ana", a comparison engine for Rose."""
# Standard Python modules
import abc
from contextlib import contextmanager
import fcntl
import glob
import inspect
import os
import re
import sqlite3
import sys
import threading
import time
import traceback
# Rose modules
from metomi.rose import TYPE_LOGICAL_VALUE_TRUE
from metomi.rose.app_run import BuiltinApp
from metomi.rose.env import env_var_process
from metomi.rose.reporter import Reporter
from metomi.rose.resource import ResourceLocator
def timestamp():
"""
Return the time in a more concise form then time.asctime
"""
return time.strftime("%H:%M:%S")
class KGODatabase:
"""
KGO Database object, stores comparison information for metomi.rose_ana
apps.
"""
# This SQL command ensures a "comparisons" table exists in the database
# and then populates it with a series of columns (which in this case
# are all storing strings/text). The primary key is the comparison name
# (as it must uniquely identify each row)
CREATE_COMPARISON_TABLE = """
CREATE TABLE IF NOT EXISTS comparisons (
comp_task TEXT,
kgo_file TEXT,
suite_file TEXT,
status TEXT,
comparison TEXT,
PRIMARY KEY(comp_task))
"""
# This SQL command ensures a "tasks" table exists in the database
# and then populates it with a pair of columns (the task name and
# a completion status indicator). The primary key is the task name
# (as it must uniquely identify each row)
CREATE_TASKS_TABLE = """
CREATE TABLE IF NOT EXISTS tasks (
task_name TEXT,
completed INT,
PRIMARY KEY(task_name))
"""
# Task statuses
TASK_STATUS_RUNNING = 1
TASK_STATUS_SUCCEEDED = 0
def __init__(self):
"Initialise the object."
self.statement_buffer = []
self.task_name = "task_name not set"
def enter_comparison(
self, comp_task, kgo_file, suite_file, status, comparison
):
"""Add a command to insert a new comparison entry to the database."""
# This SQL command indicates that a single "row" is to be entered into
# the "comparisons" table
sql_statement = (
"INSERT OR REPLACE INTO comparisons VALUES (?, ?, ?, ?, ?)"
)
# Prepend the task_name onto each entry, to try and ensure it is
# unique (the individual comparison names may not be, but the
# rose task name + the comparison task name should)
sql_args = [
self.task_name + " - " + comp_task,
kgo_file,
suite_file,
status,
comparison,
]
# Add the command and arguments to the buffer
self.statement_buffer.append((sql_statement, sql_args))
def enter_task(self, task_name, status):
"""Add a command to insert a new task entry to the database."""
# This SQL command indicates that a single "row" is to be entered into
# the "tasks" table
sql_statement = "INSERT OR REPLACE INTO tasks VALUES (?, ?)"
sql_args = [task_name, status]
# Add the command and arguments to the buffer
self.statement_buffer.append((sql_statement, sql_args))
# Save the name for use in any comparisons later
self.task_name = task_name
@contextmanager
def database_lock(self, lockfile, reporter=None):
"""Context manager which obtains an exclusive file-based lock."""
lock = open(lockfile, "w")
fcntl.flock(lock, fcntl.LOCK_EX)
if reporter is not None:
reporter("Acquired DB lock at: " + timestamp())
lock.write("{0}".format(os.getpid()))
if reporter is not None:
reporter("Writing to KGO Database...")
yield
fcntl.flock(lock, fcntl.LOCK_UN)
if reporter is not None:
reporter("Released DB lock at: " + timestamp())
lock.close()
def buffer_to_db(self, reporter=None):
"""Flush the buffer; executing all the commands in one transaction."""
# If the buffer is empty, there isn't anything to do
if len(self.statement_buffer) == 0:
return
# Otherwise locate the database
file_name = os.path.join(
os.getenv("ROSE_SUITE_DIR"), "log", "rose-ana-comparisons.db"
)
lock_name = file_name + ".lock"
if reporter is not None:
for statement in self.statement_buffer:
reporter(str(statement), level=reporter.V)
# Acquire a file lock to ensure exclusive access, then connect to the
# database and commit each command from the buffer
with self.database_lock(lock_name, reporter):
conn = sqlite3.connect(file_name, timeout=60.0)
# Ensure that the tables exist
conn.execute(self.CREATE_COMPARISON_TABLE)
conn.execute(self.CREATE_TASKS_TABLE)
# Apply each command from the buffer
for statement, args in self.statement_buffer:
conn.execute(statement, args)
# Finalise the database
conn.commit()
# Empty the buffer in case it gets re-used
self.statement_buffer = []
[docs]
class AnalysisTask(object, metaclass=abc.ABCMeta):
"""Base class for an analysis task.
All custom user tasks should inherit from this class and override
the ``run_analysis`` method to perform whatever analysis is required.
This class provides the following attributes:
Attributes:
self.config:
A dictionary containing any Rose Ana configuration options.
self.reporter:
A reference to the :py:class:`metomi.rose.reporter.Reporter`
instance used by the parent app (for printing to stderr/stdout).
self.kgo_db:
A reference to the KGO database object created by the parent app
(for adding entries to the database).
self.popen:
A reference to the :py:class:`metomi.rose.popen.RosePopener`
instance used by the parent app (for spawning subprocesses).
"""
def __init__(self, parent_app, task_options):
"""
Initialise the analysis task, storing the user specified options
dictionary and a few references to useful objects from the parent app.
"""
self.options = task_options
# This attribute gives access to the parent task; but it is only
# included for backwards compatibility and will be remove in the
# future (please instead use the other attributes below!)
self.parent = parent_app
# Attributes to access some helpful/relevant parts of the parent
# task environment for printing, running tasks, etc.
self.config = parent_app.ana_config
self.reporter = parent_app.reporter
self.kgo_db = parent_app.kgo_db
self.popen = parent_app.app_runner.popen
self.passed = False
self.skipped = False
@abc.abstractmethod
def run_analysis(self):
"""
Will be called to start the analysis code; this method should be
overridden by the user's class to perform the desired analysis.
"""
msg = "Abstract analysis task class should never be called directly"
raise ValueError(msg)
def process_opt_unhandled(self):
"""
Options should be removed from the options dictionary as they are
processed; this method may then be called to catch and unknown options
"""
unhandled = []
for option in self.options:
if option not in ["full_task_name", "description"]:
unhandled.append(option)
if unhandled:
msg = (
"Options provided but not understood for this "
"analysis type: {0}"
)
raise ValueError(msg.format(unhandled))
class TaskRunner(threading.Thread):
def __init__(self, app, index, task):
threading.Thread.__init__(self)
self.name = "Task-{0}".format(index + 1)
self.app = app
self.index = index
self.task = task
self.skips = 0
self.failures = 0
self.task_error = False
self.summary_status = []
self.reporter_args = []
def run(self):
# Create a temporary handler for the reporter class
reporter = Reporter(self.app.opts.verbosity - self.app.opts.quietness)
def handler(message, kind, level, prefix, clip):
self.reporter_args.append((message, kind, level, prefix, clip))
reporter.event_handler = handler
self.task.reporter = reporter
# Report the name of the task and a banner line to aid readability.
self.app.titlebar("Running task #{0}".format(self.index + 1), reporter)
reporter("Method: {0}".format(self.task.options["full_task_name"]))
reporter(
"Thread ID {0} starting at {1}".format(self.ident, timestamp())
)
# Since the run_analysis method is out of rose's control in many
# cases the safest thing to do is a blanket try/except; since we
# have no way of knowing what exceptions might be raised.
try:
self.task.run_analysis()
# In the case that the task didn't raise any exception,
# we can now check whether it passed or failed.
if self.task.passed:
msg = "Task #{0} passed at {1}".format(
self.index + 1, timestamp()
)
self.summary_status.append(
(
"{0} ({1})".format(
msg, self.task.options["full_task_name"]
),
self.app._prefix_pass,
)
)
reporter(msg, prefix=self.app._prefix_pass)
elif self.task.skipped:
self.skips = 1
msg = "Task #{0} skipped by method".format(self.index + 1)
self.summary_status.append(
(
"{0} ({1})".format(
msg, self.task.options["full_task_name"]
),
self.app._prefix_skip,
)
)
reporter(msg, prefix=self.app._prefix_skip)
else:
self.failures = 1
msg = "Task #{0} did not pass at {1}".format(
self.index + 1, timestamp()
)
self.summary_status.append(
(
"{0} ({1})".format(
msg, self.task.options["full_task_name"]
),
self.app._prefix_fail,
)
)
reporter(msg, prefix=self.app._prefix_fail)
except Exception:
# If an exception was raised, print a traceback and treat it
# as a failure.
self.task_error = True
self.failures = 1
msg = "Task #{0} encountered an error at {1}".format(
self.index + 1, timestamp()
)
self.summary_status.append(
(
"{0} ({1})".format(
msg, self.task.options["full_task_name"]
),
self.app._prefix_fail,
)
)
reporter(msg + " (see stderr)", prefix=self.app._prefix_fail)
exception = traceback.format_exc()
reporter(msg, prefix=self.app._prefix_fail, kind=reporter.KIND_ERR)
reporter(
exception, prefix=self.app._prefix_fail, kind=reporter.KIND_ERR
)
class RoseAnaApp(BuiltinApp):
"""Run rosa ana as an application."""
SCHEME = "rose_ana"
_prefix_pass = "[ OK ] "
_prefix_skip = "[SKIP] "
_prefix_fail = "[FAIL] "
_printbar_width = 80
def run(self, app_runner, conf_tree, opts, args, uuid, work_files):
"""Implement the "rose ana" command"""
# Initialise properties which will be needed later.
self._task = app_runner.suite_engine_proc.get_task_props()
self.task_name = self._task.task_name
self.opts = opts
self.args = args
self.config = conf_tree.node
self.app_runner = app_runner
# Attach to the main rose config (for retrieving settings from
# things like the user's ~/.metomi/rose.conf)
self.rose_conf = ResourceLocator.default().get_conf()
# Attach to a reporter instance for sending messages.
self._init_reporter(app_runner.event_handler)
# As part of the introduction of a re-written rose_ana,
# backwards compatibility is maintained here by detecting the lack of
# the newer syntax in the app config and falling back to the old
# version of the rose_ana app (renamed to rose_ana_v1)
# **Once the old behaviour is removed the below block can be too**.
new_style_app = False
for keys, _ in self.config.walk(no_ignore=True):
task = keys[0]
if task.startswith("ana:"):
new_style_app = True
break
if not new_style_app:
# Use the previous app by instantiating and calling it explicitly
self.reporter(
"!!WARNING!! - Detected old style rose_ana app; "
"Using previous rose_ana version..."
)
from metomi.rose.apps.rose_ana_v1 import RoseAnaV1App
old_app = RoseAnaV1App(manager=self.manager)
return old_app.run(
app_runner, conf_tree, opts, args, uuid, work_files
)
# Load any rose_ana specific configuration settings either from
# the site defaults or the user's personal config
self._get_global_ana_config()
# If the user's config indicates that it should be used - attach
# to the KGO database instance in case it is needed later.
use_kgo = self.ana_config.get("kgo-database", ".false.")
self.kgo_db = None
if use_kgo == TYPE_LOGICAL_VALUE_TRUE:
self.kgo_db = KGODatabase()
self.kgo_db.enter_task(
self.task_name, self.kgo_db.TASK_STATUS_RUNNING
)
self.titlebar("Initialising KGO database")
self.kgo_db.buffer_to_db(self.reporter)
self.titlebar("Launching rose_ana")
# Load available methods for analysis and the tasks in the app.
self._load_analysis_modules()
self._load_analysis_methods()
self._load_tasks()
# Get the number of desired threads from the ana config (if set).
# If this is not set or set to 1 then fall back to the old behaviour
# and don't use threads at all
n_threads = int(self.ana_config.get("threads", 1))
if n_threads == 1:
self.reporter("Running in SERIAL mode")
# Single threaded case - run the tasks in serial
number_of_failures = 0
number_of_skips = 0
task_error = False
summary_status = []
for itask, task in enumerate(self.analysis_tasks):
# Report the name of the task and a banner line to aid
# readability
self.titlebar("Running task #{0}".format(itask + 1))
self.reporter(
"Method: {0}".format(task.options["full_task_name"])
)
# Since the run_analysis method is out of rose's control in
# many cases the safest thing to do is a blanket try/except;
# since we have no way of knowing what exceptions might be
# raised.
try:
task.run_analysis()
# In the case that the task didn't raise any exception,
# we can now check whether it passed or failed.
if task.passed:
msg = "Task #{0} passed at {1}".format(
itask + 1, timestamp()
)
summary_status.append(
(
"{0} ({1})".format(
msg, task.options["full_task_name"]
),
self._prefix_pass,
)
)
self.reporter(msg, prefix=self._prefix_pass)
elif task.skipped:
number_of_skips += 1
msg = "Task #{0} skipped by method".format(itask + 1)
summary_status.append(
(
"{0} ({1})".format(
msg, task.options["full_task_name"]
),
self._prefix_skip,
)
)
self.reporter(msg, prefix=self._prefix_skip)
else:
number_of_failures += 1
msg = "Task #{0} did not pass at {1}".format(
itask + 1, timestamp()
)
summary_status.append(
(
"{0} ({1})".format(
msg, task.options["full_task_name"]
),
self._prefix_fail,
)
)
self.reporter(msg, prefix=self._prefix_fail)
except Exception:
# If an exception was raised, print a traceback and treat
# it as a failure.
task_error = True
number_of_failures += 1
msg = "Task #{0} encountered an error at {1}".format(
itask + 1, timestamp()
)
summary_status.append(
(
"{0} ({1})".format(
msg, task.options["full_task_name"]
),
self._prefix_fail,
)
)
self.reporter(
msg + " (see stderr)", prefix=self._prefix_fail
)
exception = traceback.format_exc()
self.reporter(
msg,
prefix=self._prefix_fail,
kind=self.reporter.KIND_ERR,
)
self.reporter(
exception,
prefix=self._prefix_fail,
kind=self.reporter.KIND_ERR,
)
elif n_threads > 1:
self.reporter(
"Running in THREADED mode, with {0} threads".format(n_threads)
)
# Multithreaded case
# Create threading objects for each comparison task
threads = []
for itask, task in enumerate(self.analysis_tasks):
threads.append(TaskRunner(self, itask, task))
# Start threads within the set number of concurrent threads until
# all have been started
itask = 0
running = []
while itask < len(threads):
if len(running) < n_threads:
self.reporter(
"Starting thread for task {0} at {1}".format(
itask + 1, timestamp()
)
)
running.append(threads[itask])
threads[itask].start()
itask += 1
for thread in running:
if not thread.is_alive():
running.remove(thread)
# Gather up the results
number_of_failures = 0
number_of_skips = 0
task_error = False
summary_status = []
for thread in threads:
# If any threads haven't finished yet make sure to wait
thread.join()
number_of_failures += thread.failures
number_of_skips += thread.skips
task_error = task_error or thread.task_error
summary_status += thread.summary_status
# And print the output via the main reporter
for args in thread.reporter_args:
self.reporter(*args)
else:
# Negative threads?
msg = "Number of threads given to rose_ana cannot be negative"
raise ValueError(msg)
# The KGO database (if needed by the task) also stores its status - to
# indicate whether there was some unexpected exception above.
if self.kgo_db is not None and not task_error:
self.kgo_db.enter_task(
self.task_name, self.kgo_db.TASK_STATUS_SUCCEEDED
)
self.titlebar("Updating KGO database")
self.kgo_db.buffer_to_db(self.reporter)
# Summarise the results of the tasks
self.titlebar("Summary")
for line, prefix in summary_status:
self.reporter(line, prefix=prefix)
# And a final 1-line summary
total = len(summary_status)
plural = {1: ""}
prefix = self._prefix_pass
passed = total - number_of_failures - number_of_skips
msg = "{0} Task{1} Passed".format(passed, plural.get(passed, "s"))
if number_of_failures > 0:
msg += ", {0} Task{1} Failed".format(
number_of_failures, plural.get(number_of_failures, "s")
)
prefix = self._prefix_fail
if number_of_skips > 0:
msg += ", {0} Task{1} Skipped".format(
number_of_skips, plural.get(number_of_skips, "s")
)
msg += " (of {0} processed)".format(total)
self.titlebar("Final status")
self.reporter(msg, prefix=prefix)
self.titlebar("Completed rose_ana")
# Finally if there were legitimate test failures raise an exception
# so that the task is caught by cylc as failed. Also fail if it looks
# like every single task has been skipped
if number_of_failures > 0 or number_of_skips == total:
raise TestsFailedException(number_of_failures)
def titlebar(self, title, reporter=None):
if reporter is None:
reporter = self.reporter
sidebarlen = (self._printbar_width - len(title) + 1) / 2 - 1
reporter("{0} {1} {0}".format("*" * int(sidebarlen), title))
def _get_global_ana_config(self):
"""Retrieves all rose_ana config options; these could be from
the site's settings or the user's personal settings."""
self.ana_config = {}
user_config = self.rose_conf.get_value(["rose-ana"])
if user_config is not None:
for name, obj in user_config.items():
if obj.state == "":
self.ana_config[name] = obj.value
def _init_reporter(self, reporter=None):
"""Attach a reporter instance to the class."""
if reporter is None:
self.reporter = Reporter(self.opts.verbosity - self.opts.quietness)
else:
self.reporter = reporter
def _load_analysis_modules(self):
"""Populate the list of modules containing analysis methods."""
# Find the possible paths that could contain modules
method_paths = self._get_method_paths()
# Report the paths that were found
self.reporter("Method module search-paths:")
for path in method_paths:
self.reporter(" * {0}".format(path))
self.modules = set([])
for path in method_paths:
# Add the method path to the start of the sys.path
sys.path.insert(0, os.path.abspath(path))
for filename in glob.glob(os.path.join(path, "*.py")):
# Find python files and attempt to import them; if a module
# fails to import report it but don't crash (the module may
# not actually be needed by this task)
module_name = os.path.splitext(os.path.basename(filename))[0]
try:
self.modules.add(__import__(module_name))
except ImportError:
# Note: We intentionally don't re-raise the exception
# here, as we want to avoid a single mistake in a user
# supplied method bringing down the entire task
msg = "Failed to import module: {0} ".format(module_name)
self.reporter(
msg, prefix="[WARN] ", kind=self.reporter.KIND_ERR
)
exception = traceback.format_exc().split("\n")
for line in exception:
self.reporter(
line,
prefix="[WARN] ",
kind=self.reporter.KIND_ERR,
)
# Remove the method path from the sys.path
sys.path.pop(0)
self.modules = list(self.modules)
self.modules.sort(key=str)
# Report the modules which were loaded
self.reporter("Method modules loaded:")
for module in self.modules:
self.reporter(" * {0}".format(module))
def _load_analysis_methods(self):
"""Populate the list of analysis methods."""
self.methods = {}
for module in self.modules:
module_name = module.__name__
method_classes = inspect.getmembers(module, inspect.isclass)
for method_class_name, method_class in method_classes:
if hasattr(method_class, "run_analysis"):
name = ".".join([module_name, method_class_name])
self.methods[name] = method_class
# Report the methods which were loaded
self.reporter("Methods available:")
for method in sorted(self.methods):
self.reporter(" * {0}".format(method))
def _load_tasks(self):
"""Populate the list of analysis tasks from the app config."""
# Fill a dictionary of tasks and extract their options and values
# - skipping any which are user/trigger-ignored
_tasks = {}
for keys, node in self.config.walk(no_ignore=True):
task = keys[0]
if task.startswith("ana:"):
# Capture the options only and save them to the tasks dict
task = task.split(":", 1)[1]
if len(keys) == 2:
# The app may define a section containing rose_ana
# config settings; add these to the config dictionary (if
# any) of the names match existing config options from the
# global config it will be overwritten)
if task == "config":
# Process any environment variables first
value = env_var_process(node.value)
self.ana_config[keys[1]] = value
continue
_tasks.setdefault(task, {})
# If the value contains newlines, split it into a list
# and either way remove any quotation marks and process
# any environment variables
value = env_var_process(node.value)
values = value.split("\n")
for ival, value in enumerate(values):
values[ival] = re.sub(
r"^((?:'|\")*)(.*)(\1)$", r"\2", value
)
# If the user passed a blank curled-braces expression
# it should be expanded to contain each of the arguments
# passed to rose_ana
new_values = []
for value in values:
if "{}" in value:
if self.args is not None and len(self.args) > 0:
for arg in self.args:
new_values.append(value.replace("{}", arg))
else:
new_values.append(value)
else:
new_values.append(value)
values = new_values
if len(values) == 1:
values = values[0]
_tasks[task][keys[1]] = values
# Can now populate the output task list with analysis objects
self.analysis_tasks = []
for name in sorted(_tasks.keys()):
options = _tasks[name]
options["full_task_name"] = name
# Create an analysis object for each task, passing through
# all options given to the section in the app, the given name
# starts with the comparison type and then optionally a
# name/description, extract this here
match = re.match(r"(?P<atype>[\w\.]+)(?:\((?P<descr>.*)\)|)", name)
if match:
options["description"] = match.group("descr")
atype = match.group("atype")
# Assuming this analysis task has been loaded by the app, create
# an instance of the task, passing the options to it
if atype in self.methods:
self.analysis_tasks.append(self.methods[atype](self, options))
else:
# If the analysis type isn't matched by one of the loaded
# methods, report the error and return a placeholder
# in its place (so that this tasks' main method can show
# the task as "failed")
msg = "Unrecognised analysis type: {0}"
self.reporter(msg.format(atype), prefix="[FAIL] ")
# Create a simple object to return - when the run_analysis
# method is called by the main loop it will simply raise
# an exception, triggering the "error" trap
class Dummy(AnalysisTask):
def run_analysis(self):
raise ImportError(msg.format(atype))
self.analysis_tasks.append(Dummy(self, options))
def _get_method_paths(self):
"""Create a listing of paths for analysis methods."""
# Setup the return list - the order of preference is earliest-first,
# allowing methods to be overridden if sharing the same namespace
method_paths = []
# The app may defines an "ana" directory specific to the app
app_dir_var = "ROSE_TASK_APP"
suite_dir = os.environ["ROSE_SUITE_DIR"]
if app_dir_var not in os.environ:
app_dir_var = "ROSE_TASK_NAME"
app_dir = os.path.join(
suite_dir, "app", os.environ[app_dir_var], "ana"
)
if os.path.exists(app_dir):
method_paths.append(app_dir)
# The suite can have an "ana" directory which the apps may use
ana_dir = os.path.join(suite_dir, "ana")
if os.path.exists(ana_dir):
method_paths.append(ana_dir)
# The rose config can specify a directory for site-specific
# methods
config_paths = self.rose_conf.get_value(["rose-ana", "method-path"])
if config_paths:
for config_dir in config_paths.split():
if os.path.exists(config_dir):
method_paths.append(config_dir)
# Finally there are some built-in methods within Rose itself
method_paths.append(
os.path.join(os.path.dirname(__file__), "ana_builtin")
)
return method_paths
class TestsFailedException(Exception):
"""Exception raised if any rose-ana comparisons fail."""
def __init__(self, num_failed):
self.failed = num_failed
def __repr__(self):
msg = "{0} test{1} did not pass".format(
self.failed, {1: ""}.get(self.failed, "s")
)
return msg
__str__ = __repr__