# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""The following options can be specified in:
* :rose:conf:`rose.conf[rose-ana]`
* :rose:conf:`rose_ana[ana:config]`
.. describe:: Options
grepper-report-limit:
A numerical value giving the maximum number of informational output
lines to print for each comparison. This is intended for cases where
for example a pattern-matching comparison is expected to match many
thousands of occurrences in the given files; it may not be desirable
to print the results of every comparison. After the given number of
lines are printed a special message indicating that the rest of the
output is truncated will be produced.
skip-if-all-files-missing:
Can be set to ``.true.`` or ``.false.``; if active, any comparison
done on files by ``grepper`` will be skipped if all of those files
are non-existent. In this case the task will return as "skipped"
rather than passed/failed.
"""
import os
import re
from metomi.rose import TYPE_LOGICAL_VALUE_TRUE
from metomi.rose.apps.rose_ana import AnalysisTask
[docs]class SingleCommandStatus(AnalysisTask):
"""Run a shell command, passing or failing depending on the exit
status of that command.
Options:
files (optional):
A newline-separated list of filenames which may appear
in the command.
command:
The command to run; if it contains Python style
format specifiers these will be expanded using the list of
files above (if provided).
kgo_file:
If the list of files above was provided gives the
(0-based) index of the file holding the "kgo" or "control"
output for use with the comparisons database (if active).
"""
[docs] def run_analysis(self):
"""Main analysis routine called from rose_ana."""
self.process_opt_files()
self.process_opt_kgo()
self.process_opt_command()
self.process_opt_unhandled()
self.get_config_opts()
if self.check_for_skip():
return
self.run_command_and_check()
self.update_kgo()
[docs] def run_command_and_check(self):
"""Run the command and return based on the output."""
# If the user has specified a KGO file, but it is missing, exit early
if self.kgo is not None:
kgo_file = self.files[self.kgo]
if not os.path.exists(kgo_file):
self.reporter(
"KGO File (file {0}) appears to be missing".format(
self.kgo + 1
),
prefix="[FAIL] ",
)
# Note that by exiting early this task counts as failed
return
# The command may contain format substitution characters, which will
# receive any filenames passed to the app.
self.command = self.command.format(*self.files)
returncode, stdout, stderr = self.run_command(self.command)
if returncode == 0:
self.reporter(stdout, prefix="[INFO] ")
self.passed = True
else:
self.reporter("STDOUT:", prefix="[FAIL] ")
self.reporter(stdout, prefix="[FAIL] ")
self.reporter("STDERR:", prefix="[FAIL] ")
self.reporter(stderr, prefix="[FAIL] ")
[docs] def check_for_skip(self):
"""If the user's config options specified that the task should be
ignored if all of its files were missing, set skipped attribute here.
"""
if self.skip_if_missing and self.files:
if not any(os.path.exists(fname) for fname in self.files):
self.skipped = True
self.reporter(
"All file arguments are missing, skipping task since "
"'skip-if-all-files-missing' is '{0}'".format(
TYPE_LOGICAL_VALUE_TRUE
)
)
return self.skipped
[docs] def get_config_opts(self):
"""Process any configuration options."""
report_limit = self.config.get("grepper-report-limit", None)
self.max_report_lines = None
if report_limit is not None and report_limit.isdigit():
self.max_report_lines = int(report_limit)
skip_missing = self.config.get("skip-if-all-files-missing", None)
self.skip_if_missing = False
if (
skip_missing is not None
and skip_missing == TYPE_LOGICAL_VALUE_TRUE
):
self.skip_if_missing = True
[docs] def process_opt_files(self):
"""Process the files option; a list of one or more filenames."""
# Get the file list from the options dictionary
files = self.options.pop("files", None)
# Make sure it appears as a sensible list
if files is None:
files = []
elif isinstance(files, str):
files = [files]
# Report the filenames (with paths)
for ifile, fname in enumerate(files):
self.reporter(
"File {0}: {1}".format(
ifile + 1, os.path.abspath(files[ifile])
)
)
self.files = files
[docs] def process_opt_kgo(self):
"""
Process the KGO option; an index indicating which file (if any) is
the KGO (Known Good Output) - this may be needed later to assist in
updating of test results.
"""
# Get the kgo index from the options dictionary
kgo = self.options.pop("kgo_file", None)
# Parse the kgo index
if kgo is not None:
if kgo.strip() == "":
kgo = None
elif kgo.isdigit():
kgo = int(kgo)
if int(kgo) > len(self.files):
msg = "KGO index cannot be greater than number of files"
raise ValueError(msg)
self.reporter("KGO is file {0}".format(kgo + 1))
else:
msg = (
"KGO index not recognised; should be either a digit or "
"left blank"
)
raise ValueError(msg)
self.kgo = kgo
[docs] def process_opt_command(self):
"""
Process the command option; this is the (shell) command that will
be run for this task.
"""
# Get the command from the options
self.command = self.options.pop("command", None)
if self.command is not None:
self.reporter("Command: {0}".format(self.command))
else:
msg = "Command not specified"
raise ValueError(msg)
[docs] def run_command(self, command):
"""Simple command runner; returns output error and return code."""
retcode, stdout, stderr = self.popen.run(command, shell=True)
return retcode, stdout, stderr
[docs] def read_file(self, filename):
"""Return the content of a given file as a list of lines."""
with open(filename, "r") as ifile:
output = ifile.read().splitlines()
return output
[docs] def update_kgo(self):
"""
Update the KGO database with the status of any files marked by the
kgo_file option (i.e. whether they have passed/failed the test.
"""
if self.kgo is not None and self.kgo_db is not None:
# Identify the KGO file from its index
kgo_file = self.files[self.kgo]
# Now find the other file/s (this is presently designed to expect
# there to be 1 KGO and 1 non-KGO file
for ifile, suite_file in enumerate(self.files):
if ifile == self.kgo:
continue
self.kgo_db.enter_comparison(
self.options["full_task_name"],
os.path.abspath(kgo_file),
os.path.abspath(suite_file),
["FAIL", " OK "][self.passed],
"Compared using grepper",
)
[docs]class SingleCommandPattern(SingleCommandStatus):
"""Run a single command and then pass/fail depending on the presence of a
particular expression in that command's standard output.
Options:
files (optional):
Same as previous task. command - same as previous task.
kgo_file:
Same as previous task.
pattern:
The regular expression to search for in the stdout from the
command.
"""
[docs] def run_analysis(self):
"""Main analysis routine called from rose_ana."""
# Note that this is identical to the above class, only it has the
# additional pattern option; so call back to the parent class
self.process_opt_pattern()
super(SingleCommandPattern, self).run_analysis()
[docs] def process_opt_pattern(self):
"""
Process the pattern option; a regular expression which will be
checked against the command output.
"""
# Get the pattern from the options dictionary
self.pattern = self.options.pop("pattern", None)
if self.pattern is not None:
self.reporter("Pattern: {0}".format(self.pattern))
else:
msg = "Must specify a pattern"
raise ValueError(msg)
[docs] def run_command_and_check(self):
"""
Run the command and check for the presence of the pattern in its
standard output.
"""
# If the user has specified a KGO file, but it is missing, exit early
if self.kgo is not None:
kgo_file = self.files[self.kgo]
if not os.path.exists(kgo_file):
self.reporter(
"KGO File (file {0}) appears to be missing".format(
self.kgo + 1
),
prefix="[FAIL] ",
)
# Note that by exiting early this task counts as failed
return
# The command may contain format substitution characters, which will
# receive any filenames passed to the app.
self.command = self.command.format(*self.files)
returncode, stdout, stderr = self.run_command(self.command)
search = re.search(self.pattern, stdout)
if search:
self.passed = True
[docs]class FilePattern(SingleCommandPattern):
"""Check for occurrences of a particular expression or value within the
contents of two or more files.
Options:
files (optional):
Same as previous tasks.
kgo_file:
Same as previous tasks.
pattern:
The regular expression to search for in the files. The
expression should include one or more capture groups; each
of these will be compared between the files any time the
pattern occurs.
tolerance (optional):
By default the above comparisons will be compared exactly,
but if this argument is specified they will be converted to
float values and compared according to the given tolerance.
If this tolerance ends in % it will be interpreted as a
relative tolerance (otherwise absolute).
"""
[docs] def run_analysis(self):
"""Main analysis routine called from rose_ana."""
self.process_opt_files()
self.process_opt_kgo()
self.process_opt_pattern()
self.process_opt_tolerance()
self.process_opt_unhandled()
self.get_config_opts()
if self.check_for_skip():
return
# If the user has specified a KGO file, but it is missing, exit early
if self.kgo is not None:
kgo_file = self.files[self.kgo]
if not os.path.exists(kgo_file):
self.reporter(
"KGO File (file {0}) appears to be missing".format(
self.kgo + 1
),
prefix="[FAIL] ",
)
# Note that by exiting early this task counts as failed
return
# Generate the groupings - the pattern can match multiple times
matched_groups = self.search_for_matches()
# Check that the number of matchings found is equal in all files
group_lens = [len(groups) for groups in matched_groups.values()]
for igroup, group_len in enumerate(group_lens[1:]):
if group_len != group_lens[0]:
msg = (
"File ({0}) matches pattern {1} times, but File ({2}) "
"matches it {3} times, cannot test"
)
raise ValueError(
msg.format(
self.files[0],
group_lens[0],
self.files[igroup + 1],
group_len,
)
)
# Compare the result of each matching
passed = [True] * len(self.files)
comparison_total = 0
failure_total = 0
for igroup in range(group_lens[0]):
ref_group = matched_groups[self.files[0]][igroup]
for ifile, fname in enumerate(self.files[1:]):
group = matched_groups[fname][igroup]
for imatch, (match1, match2) in enumerate(
zip(ref_group, group)
):
# If a tolerance was given, the matches must be numbers
failed = False
comparison_total += 1
if self.tolerance is not None:
try:
match1 = float(match1)
match2 = float(match2)
except ValueError:
msg = (
"Cannot do tolerance comparison, groups "
"matched by pattern are not reals"
)
raise ValueError(msg)
if self.relative_tol:
lower = match2 * (1.0 - 0.01 * self.tolerance)
upper = match2 * (1.0 + 0.01 * self.tolerance)
else:
lower = match2 - self.tolerance
upper = match2 + self.tolerance
if not lower <= match1 <= upper:
failed = True
elif match1 != match2:
failed = True
# Update the state of the current file if it failed above
if failed:
passed[ifile + 1] = False
failure_total += 1
# Now move on to report the output of the comparison (if
# the user's config limits the amount of output skip this)
if (
self.max_report_lines is not None
and comparison_total > self.max_report_lines
):
continue
if failed:
msg = (
"Mismatch in group {0} of pattern for "
"occurrence {1} in files"
)
prefix = "[FAIL] "
self.reporter(
msg.format(imatch + 1, igroup + 1), prefix=prefix
)
msg = "File {0}: {1}"
self.reporter(msg.format(1, match1), prefix=prefix)
self.reporter(
msg.format(ifile + 2, match2), prefix=prefix
)
else:
msg = (
"Group {0} of pattern for occurrence {1} in "
"files matches"
)
self.reporter(
msg.format(imatch + 1, igroup + 1),
level=self.reporter.V,
)
if self.tolerance is None:
msg = "Value: {0}"
self.reporter(
msg.format(match1), level=self.reporter.V
)
else:
msg = "File {0}: {1}"
self.reporter(
msg.format(1, match1), level=self.reporter.V
)
self.reporter(
msg.format(ifile + 2, match2),
level=self.reporter.V,
)
# If not all comparisons were printed, note it here
if (
self.max_report_lines is not None
and comparison_total > self.max_report_lines
):
self.reporter("... Some output omitted due to limit ...")
msg = "Performed {0} comparison{1}, with {2} failure{3}"
self.reporter(
msg.format(
comparison_total,
{1: ""}.get(comparison_total, "s"),
failure_total,
{1: ""}.get(failure_total, "s"),
)
)
# If everything passed - the task did too
self.passed = all(passed)
self.update_kgo()
[docs] def process_opt_tolerance(self):
"""
Process the tolerance option; a value given either an absolute or
relative tolerance which a numeric value must lie within.
"""
# Get the tolerance from the options dictionary
tolerance = self.options.pop("tolerance", None)
# Convert the tolerance
self.relative_tol = False
if tolerance is not None:
# Determine what type of tolerance it is and set the flag
if tolerance.endswith("%"):
self.relative_tol = True
tolerance = float(tolerance.strip("%"))
self.reporter("Relative (%) tolerance: {0}".format(tolerance))
else:
tolerance = float(tolerance)
self.reporter("Absolute tolerance: {0}".format(tolerance))
self.tolerance = tolerance
[docs] def search_for_matches(self):
"""
Search the contents of the files for the patterns; returning a
dictionary whose keys are the file-names and whose values are
lists of the groupings (one for each occurrence)
"""
matched_groups = {}
for fname in self.files:
matched_groups[fname] = []
for line in self.read_file(fname):
search = re.search(self.pattern, line)
if search:
matched_groups[fname].append(search.groups())
return matched_groups
[docs]class FileCommandPattern(FilePattern):
"""Check for occurrences of a particular expression or value in the
standard output from a command applied to two or more files.
Options:
files (optional):
Same as previous tasks.
kgo_file:
Same as previous tasks.
pattern:
Same as previous tasks.
tolerance (optional):
Same as previous tasks.
command:
The command to run; it should contain a Python style format
specifier to be expanded using the list of files above.
"""
[docs] def run_analysis(self):
"""Main analysis routine called from rose_ana."""
# Note that this is identical to the above class, only it has the
# additional command option; so call back to the parent class
self.process_opt_command()
super(FileCommandPattern, self).run_analysis()
[docs] def search_for_matches(self):
"""
Run the command on each file then search its output for the pattern;
returning a dictionary whose keys are the file-names and whose values
are lists of the groupings (one for each occurrence).
"""
matched_groups = {}
for fname in self.files:
matched_groups[fname] = []
command = self.command.format(fname)
returncode, stdout, stderr = self.run_command(command)
if returncode == 0:
for line in stdout.split("\n"):
search = re.search(self.pattern, line)
if search:
matched_groups[fname].append(search.groups())
else:
msg = "Command failed, stderr: {0}"
raise ValueError(msg.format(stderr))
return matched_groups