Source code for rosie.ws_client

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright (C) 2012-2019 British Crown (Met Office) & Contributors.
#
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Rosie web service client.

Classes:
    RosieWSClient - sends requests, retrieves data from the web server

"""


import json
from multiprocessing import Pool
import requests
import shlex
import sys
from time import sleep

from rosie.suite_id import SuiteId
from rosie.ws_client_auth import RosieWSClientAuthManager
from rose.popen import RosePopener
from rose.reporter import Reporter
from rose.resource import ResourceLocator


class RosieWSClientConfError(Exception):

    """Raised if no Rosie service server is configured."""

    def __str__(self):
        return "[rosie-id] settings not defined in site/user configuration."


class RosieWSClientError(Exception):

    """Raised if no data were retrieved from the server."""

    def __str__(self):
        return ": ".join([str(arg) for arg in self.args])


class RosieWSClientQuerySplitError(RosieWSClientError):

    """Raised on query items syntax error."""

    def __str__(self):
        return "Query syntax error: " + " ".join(self.args[0])


[docs]class RosieWSClient(object): """A client for the Rosie web service. Args: prefixes (list): List of prefix names as strings. (run ``rose config rosie-id`` for more info). prompt_func (function): Optional function for requesting user credentials. Takes and returns the arguments username and password. popen (rose.popen.RosePopener): Use initiated RosePopener instance create a new one if ``None``. event_handler (object): A callable object for reporting popen output, see :py:class:`rose.reporter.Reporter`. """ MAX_LOCAL_QUERIES = 64 POLL_DELAY = 0.1 REMOVABLE_PARAMS = ["all_revs=0", "format=json"] def __init__(self, prefixes=None, prompt_func=None, popen=None, event_handler=None): if not event_handler: event_handler = Reporter() if not popen: popen = RosePopener(event_handler=event_handler) self.event_handler = event_handler self.popen = popen self.prompt_func = prompt_func self.prefixes = [] self.unreachable_prefixes = [] self.auth_managers = {} conf = ResourceLocator.default().get_conf() conf_rosie_id = conf.get(["rosie-id"], no_ignore=True) if conf_rosie_id is None: raise RosieWSClientConfError() for key, node in conf_rosie_id.value.items(): if node.is_ignored() or not key.startswith("prefix-ws."): continue prefix = key.replace("prefix-ws.", "") self.auth_managers[prefix] = RosieWSClientAuthManager( prefix, popen=self.popen, prompt_func=self.prompt_func) if not prefixes: prefixes_str = conf_rosie_id.get_value(["prefixes-ws-default"]) if prefixes_str: prefixes = shlex.split(prefixes_str) else: prefixes = sorted(self.auth_managers.keys()) self.set_prefixes(prefixes)
[docs] def set_prefixes(self, prefixes): """Replace the default prefixes. Args: prefixes (list): List of prefix names as strings. """ prefixes.sort() if self.prefixes != prefixes: self.prefixes = prefixes for prefix in self.prefixes: if prefix in self.auth_managers: continue self.auth_managers[prefix] = RosieWSClientAuthManager( prefix, popen=self.popen, prompt_func=self.prompt_func, event_handler=self.event_handler) # Remove uncontactable prefixes from the list. ok_prefixes = self.hello(return_ok_prefixes=True) prefixes = [] self.unreachable_prefixes = [] for prefix in self.prefixes: if prefix in ok_prefixes: prefixes.append(prefix) else: self.unreachable_prefixes.append(prefix)
self.prefixes = prefixes def _get(self, method, return_ok_prefixes=False, **kwargs): """Send an HTTP GET request to the known servers. Return a list, each element is the result from a successful request to a web server. """ # Gather the details of the requests to send if method == "address": url = kwargs.pop("url") url = self._remove_params(url) else: url = method request_details = {} if url.startswith("http"): for prefix in self.prefixes: auth_manager = self.auth_managers[prefix] if url.startswith(auth_manager.root): request_details[url] = self._create_request_detail( url, prefix, kwargs, auth_manager) break else: request_details[url] = self._create_request_detail( url, prefix, kwargs) else: for prefix in self.prefixes: auth_manager = self.auth_managers[prefix] full_url = auth_manager.root + url request_details[full_url] = self._create_request_detail( full_url, prefix, kwargs, auth_manager) if not request_details: raise RosieWSClientError(method, kwargs) # Filter security warnings from urllib3 on python <2.7.9. Obviously, we # want to upgrade, but some sites have to run cylc on platforms with # python <2.7.9. On those platforms, these warnings serve no purpose # except to annoy or confuse users. if sys.version_info < (2, 7, 9): import warnings try: from requests.packages.urllib3.exceptions import ( InsecurePlatformWarning) except ImportError: pass else: warnings.simplefilter("ignore", InsecurePlatformWarning) try: from requests.packages.urllib3.exceptions import ( SNIMissingWarning) except ImportError: pass else: warnings.simplefilter("ignore", SNIMissingWarning) # Process the requests in parallel pool = Pool(len(request_details)) results = {} for url, request_detail in request_details.items(): results[url] = pool.apply_async( requests.get, [url], request_detail["requests_kwargs"]) while results: for url, result in results.items(): if not result.ready(): continue results.pop(url) try: response = result.get() except (requests.exceptions.ConnectionError, requests.exceptions.MissingSchema) as exc: self.event_handler( RosieWSClientError(url, exc), level=1) continue request_detail = request_details[url] # Retry request once, if it fails with a 401 if (response.status_code == requests.codes["unauthorized"] and request_detail["can_retry"]): requests_kwargs = request_detail["requests_kwargs"] auth_manager = request_detail["auth_manager"] prev_auth = requests_kwargs["auth"] try: requests_kwargs["auth"] = auth_manager.get_auth( is_retry=True) except KeyboardInterrupt as exc: error = RosieWSClientError(url, kwargs, exc) self.event_handler(error, level=1) request_detail["can_retry"] = False else: results[url] = pool.apply_async( requests.get, [url], requests_kwargs) request_detail["can_retry"] = ( prev_auth != requests_kwargs["auth"]) continue request_detail["response"] = response if results: sleep(self.POLL_DELAY) pool.close() pool.join() # Process and return the results ret = [] for url, request_detail in sorted(request_details.items()): response = request_detail["response"] if response is None: continue try: response.raise_for_status() except requests.exceptions.RequestException: if request_detail["auth_manager"] is not None: request_detail["auth_manager"].clear_password() self.event_handler( RosieWSClientError(url, kwargs, response.status_code), level=1) continue if request_detail["auth_manager"] is not None: request_detail["auth_manager"].store_password() response_url = self._remove_params(response.url) try: response_data = json.loads(response.text) if return_ok_prefixes: ret.append(request_detail["prefix"]) else: ret.append((response_data, response_url)) except ValueError: self.event_handler( RosieWSClientError(url, kwargs), level=1) if not ret: raise RosieWSClientError(method, kwargs) return ret @classmethod def _remove_params(cls, url): """Remove removable parameters from url.""" for removable_param in cls.REMOVABLE_PARAMS: url = url.replace("?" + removable_param, "?") url = url.replace("&" + removable_param, "") return url.replace("?&", "?").rstrip("?") @classmethod def _create_request_detail(cls, url, prefix, params, auth_manager=None): """Helper for "_get". Return a dict with request details. The dict will be populated like this: { "url": url, "prefix": prefix, "auth_manager": auth_manager, "can_retry": False, "requests_kwargs": requests_kwargs, "response": None, } """ params = dict(params) params["format"] = "json" requests_kwargs = {"params": params} can_retry = (auth_manager is not None) if auth_manager: requests_kwargs.update(auth_manager.requests_kwargs) requests_kwargs["auth"] = auth_manager.get_auth() if requests_kwargs["auth"] is None: # None implies a failure to get auth, unlike '()'. can_retry = False requests_kwargs.pop("auth") return { "auth_manager": auth_manager, "can_retry": can_retry, "requests_kwargs": requests_kwargs, "response": None, "prefix": prefix, "url": url} def _get_keys(self, name): """Return named keys from web services.""" ret = [] for data, _ in self._get(name): for datum in data: if datum not in ret: ret.append(datum) return ret
[docs] def get_known_keys(self): """Return the known query keys."""
return self._get_keys("get_known_keys")
[docs] def get_optional_keys(self): """Return the optional query keys."""
return self._get_keys("get_optional_keys")
[docs] def get_query_operators(self): """Return the query operators."""
return self._get_keys("get_query_operators")
[docs] def hello(self, return_ok_prefixes=False): """Ask the server to say hello."""
return self._get("hello", return_ok_prefixes=return_ok_prefixes)
[docs] def query(self, q, **kwargs): """Query the Rosie database."""
return self._get("query", q=q, **kwargs)
[docs] def search(self, s, **kwargs): """Search the Rosie database for a matching string."""
return self._get("search", s=s, **kwargs)
[docs] def address_lookup(self, **kwargs): """Repeat a Rosie query or search by address."""
return self._get("address", **kwargs)
[docs] def query_local_copies(self, user=None): """Returns details of the local suites. As if they had been obtained using a search or query. """ suite_ids = [] for suite_id in SuiteId.get_checked_out_suite_ids(user=user): if suite_id.prefix in self.prefixes: suite_ids.append(suite_id) if not suite_ids: return [] # Simple query results = [] queued_suite_ids = list(suite_ids) while queued_suite_ids: # Batch up queries q_list = [] for _ in range(self.MAX_LOCAL_QUERIES): if not queued_suite_ids: break suite_id = queued_suite_ids.pop() q_list.append("or ( idx eq %s" % suite_id.idx) q_list.append("and branch eq %s )" % suite_id.branch) for data, _ in self.query(q_list): results.extend(data) result_idx_branches = [] for result in results: result_idx_branches.append((result[u"idx"], result[u"branch"])) # A branch may have been deleted - query with all_revs=1. # We only want to use all_revs on demand as it's slow. queued_suite_ids = [] for suite_id in suite_ids: if (suite_id.idx, suite_id.branch) not in result_idx_branches: queued_suite_ids.append(suite_id) if not queued_suite_ids: return results while queued_suite_ids: # Batch up queries q_list = [] for _ in range(self.MAX_LOCAL_QUERIES): if not queued_suite_ids: break suite_id = queued_suite_ids.pop() q_list.append("or ( idx eq %s" % suite_id.idx) q_list.append("and branch eq %s )" % suite_id.branch) more_results = [] for data, _ in self.query(q_list, all_revs=1): more_results.extend(data) new_results = {} for result in more_results: idx_branch = (result[u"idx"], result[u"branch"]) if (idx_branch not in new_results or result[u"revision"] > new_results[idx_branch][u"revision"]): new_results.update({idx_branch: result}) for _, result in sorted(new_results.items()): results.append(result)
return results
[docs] @classmethod def query_split(cls, args): """Split a list of arguments into a list of query items.""" args = list(args) if args[0] not in ["and", "or"]: args.insert(0, "and") q_list = [] # Query list q_item = [] # Individual query pieces list level = 0 # Number of open brackets while args: arg = args.pop(0) arg_1 = args[0] if args else None if (arg in ["and", "or"] and arg_1 not in ["and", "or"]): if len(q_item) >= 4: q_list.append(q_item) q_item = [] elif not args: q_item.append(arg) if len(q_item) < 4: raise RosieWSClientQuerySplitError(args) q_list.append(q_item) q_item = [] q_item.append(arg) level += len(arg) if all([c == "(" for c in arg]) else 0 level -= len(arg) if all([c == ")" for c in arg]) else 0 if ( len(q_item) > 1 or level != 0 or any(len(q_item) not in [4, 5, 6] for q_item in q_list) ): raise RosieWSClientQuerySplitError(args)
return q_list