From 9bfdd871e90d80a85825dcd6f79a095e1b787083 Mon Sep 17 00:00:00 2001 From: mg Date: Thu, 4 Nov 2021 18:17:00 +0100 Subject: [PATCH] ara: callback-plugin in Repo aufgenommen (#235) Co-authored-by: Michael Grote Reviewed-on: https://git.mgrote.net/mg/ansible/pulls/235 Co-authored-by: mg Co-committed-by: mg --- ansible.cfg | 2 +- plugins/callback/ara_default.py | 618 ++++++++++++++++++++++++++++++++ 2 files changed, 619 insertions(+), 1 deletion(-) create mode 100644 plugins/callback/ara_default.py diff --git a/ansible.cfg b/ansible.cfg index 63960529..7adfaa5f 100644 --- a/ansible.cfg +++ b/ansible.cfg @@ -10,7 +10,7 @@ vault_password_file = vault-pass.yml gathering = smart #display_ok_hosts = no # zeigt nur noch changed und error tasks/hosts an #display_skipped_hosts = yes # dito -callback_plugins = /home/mg/.local/lib/python3.8/site-packages/ara/plugins/callback +callback_plugins = ./plugins/callback # python3 -m ara.setup.callback_plugins [inventory] diff --git a/plugins/callback/ara_default.py b/plugins/callback/ara_default.py new file mode 100644 index 00000000..a16202fb --- /dev/null +++ b/plugins/callback/ara_default.py @@ -0,0 +1,618 @@ +# Copyright (c) 2018 Red Hat, Inc. +# +# This file is part of ARA: Ansible Run Analysis. +# +# ARA is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ARA is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with ARA. If not, see . + +from __future__ import absolute_import, division, print_function + +import datetime +import json +import logging +import os +import socket +from concurrent.futures import ThreadPoolExecutor + +from ansible import __version__ as ansible_version +from ansible.parsing.ajson import AnsibleJSONEncoder +from ansible.plugins.callback import CallbackBase +from ansible.vars.clean import module_response_deepcopy, strip_internal_keys + +from ara.clients import utils as client_utils + +# Ansible CLI options are now in ansible.context in >= 2.8 +# https://github.com/ansible/ansible/commit/afdbb0d9d5bebb91f632f0d4a1364de5393ba17a +try: + from ansible import context + + cli_options = {key: value for key, value in context.CLIARGS.items()} +except ImportError: + # < 2.8 doesn't have ansible.context + try: + from __main__ import cli + + cli_options = cli.options.__dict__ + except ImportError: + # using API without CLI + cli_options = {} + + +DOCUMENTATION = """ +callback: ara +callback_type: notification +requirements: + - ara +short_description: Sends playbook execution data to the ARA API internally or over HTTP +description: + - Sends playbook execution data to the ARA API internally or over HTTP +options: + api_client: + description: The client to use for communicating with the API + default: offline + env: + - name: ARA_API_CLIENT + ini: + - section: ara + key: api_client + choices: ['offline', 'http'] + api_server: + description: When using the HTTP client, the base URL to the ARA API server + default: http://127.0.0.1:8000 + env: + - name: ARA_API_SERVER + ini: + - section: ara + key: api_server + api_username: + description: If authentication is required, the username to authenticate with + default: null + env: + - name: ARA_API_USERNAME + ini: + - section: ara + key: api_username + api_password: + description: If authentication is required, the password to authenticate with + default: null + env: + - name: ARA_API_PASSWORD + ini: + - section: ara + key: api_password + api_insecure: + description: Can be enabled to ignore SSL certification of the API server + type: bool + default: false + env: + - name: ARA_API_INSECURE + ini: + - section: ara + key: api_insecure + api_timeout: + description: Timeout, in seconds, before giving up on HTTP requests + type: integer + default: 30 + env: + - name: ARA_API_TIMEOUT + ini: + - section: ara + key: api_timeout + argument_labels: + description: | + A list of CLI arguments that, if set, will be automatically applied to playbooks as labels. + Note that CLI arguments are not always named the same as how they are represented by Ansible. + For example, --limit is "subset", --user is "remote_user" but --check is "check". + type: list + default: + - remote_user + - check + - tags + - skip_tags + - subset + env: + - name: ARA_ARGUMENT_LABELS + ini: + - section: ara + key: argument_labels + default_labels: + description: A list of default labels that will be applied to playbooks + type: list + default: [] + env: + - name: ARA_DEFAULT_LABELS + ini: + - section: ara + key: default_labels + ignored_facts: + description: List of host facts that will not be saved by ARA + type: list + default: ["ansible_env"] + env: + - name: ARA_IGNORED_FACTS + ini: + - section: ara + key: ignored_facts + ignored_arguments: + description: List of Ansible arguments that will not be saved by ARA + type: list + default: ["extra_vars"] + env: + - name: ARA_IGNORED_ARGUMENTS + ini: + - section: ara + key: ignored_arguments + ignored_files: + description: List of patterns that will not be saved by ARA + type: list + default: [] + env: + - name: ARA_IGNORED_FILES + ini: + - section: ara + key: ignored_files + callback_threads: + description: + - The number of threads to use in API client thread pools + - When set to 0, no threading will be used (default) which is appropriate for usage with sqlite + - Using threads is recommended when the server is using MySQL or PostgreSQL + type: integer + default: 0 + env: + - name: ARA_CALLBACK_THREADS + ini: + - section: ara + key: callback_threads +""" + + +class CallbackModule(CallbackBase): + """ + Saves data from an Ansible run into a database + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = "awesome" + CALLBACK_NAME = "ara_default" + + def __init__(self): + super(CallbackModule, self).__init__() + self.log = logging.getLogger("ara.plugins.callback.default") + # These are configured in self.set_options + self.client = None + self.callback_threads = None + + self.ignored_facts = [] + self.ignored_arguments = [] + self.ignored_files = [] + + self.result = None + self.result_started = {} + self.result_ended = {} + self.task = None + self.play = None + self.playbook = None + self.stats = None + self.file_cache = {} + self.host_cache = {} + self.task_cache = {} + self.delegation_cache = {} + + def set_options(self, task_keys=None, var_options=None, direct=None): + super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct) + + self.argument_labels = self.get_option("argument_labels") + self.default_labels = self.get_option("default_labels") + self.ignored_facts = self.get_option("ignored_facts") + self.ignored_arguments = self.get_option("ignored_arguments") + self.ignored_files = self.get_option("ignored_files") + + client = self.get_option("api_client") + endpoint = self.get_option("api_server") + timeout = self.get_option("api_timeout") + username = self.get_option("api_username") + password = self.get_option("api_password") + insecure = self.get_option("api_insecure") + self.client = client_utils.get_client( + client=client, + endpoint=endpoint, + timeout=timeout, + username=username, + password=password, + verify=False if insecure else True, + ) + + # TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter. + # In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE. + # Otherwise we can hit "urllib3.connectionpool: Connection pool is full" + self.callback_threads = self.get_option("callback_threads") + if self.callback_threads > 4: + self.callback_threads = 4 + + def _submit_thread(self, threadpool, func, *args, **kwargs): + # Manages whether or not the function should be threaded to keep things DRY + if self.callback_threads: + # Pick from one of two thread pools (global or task) + threads = getattr(self, threadpool + "_threads") + threads.submit(func, *args, **kwargs) + else: + func(*args, **kwargs) + + def v2_playbook_on_start(self, playbook): + self.log.debug("v2_playbook_on_start") + + if self.callback_threads: + self.global_threads = ThreadPoolExecutor(max_workers=self.callback_threads) + self.log.debug("Global thread pool initialized with %s thread(s)" % self.callback_threads) + + content = None + + if playbook._file_name == "__adhoc_playbook__": + content = cli_options["module_name"] + if cli_options["module_args"]: + content = "{0}: {1}".format(content, cli_options["module_args"]) + path = "Ad-Hoc: {0}".format(content) + else: + path = os.path.abspath(playbook._file_name) + + # Potentially sanitize some user-specified keys + for argument in self.ignored_arguments: + if argument in cli_options: + self.log.debug("Ignoring argument: %s" % argument) + cli_options[argument] = "Not saved by ARA as configured by 'ignored_arguments'" + + # Retrieve and format CLI options for argument labels + argument_labels = [] + for label in self.argument_labels: + if label in cli_options: + # Some arguments are lists or tuples + if isinstance(cli_options[label], tuple) or isinstance(cli_options[label], list): + # Only label these if they're not empty + if cli_options[label]: + argument_labels.append("%s:%s" % (label, ",".join(cli_options[label]))) + # Some arguments are booleans + elif isinstance(cli_options[label], bool): + argument_labels.append("%s:%s" % (label, cli_options[label])) + # The rest can be printed as-is if there is something set + elif cli_options[label]: + argument_labels.append("%s:%s" % (label, cli_options[label])) + self.argument_labels = argument_labels + + # Create the playbook + self.playbook = self.client.post( + "/api/v1/playbooks", + ansible_version=ansible_version, + arguments=cli_options, + status="running", + path=path, + controller=socket.getfqdn(), + started=datetime.datetime.now(datetime.timezone.utc).isoformat(), + ) + + # Record the playbook file + self._submit_thread("global", self._get_or_create_file, path, content) + + return self.playbook + + def v2_playbook_on_play_start(self, play): + self.log.debug("v2_playbook_on_play_start") + self._end_task() + self._end_play() + + # Load variables to verify if there is anything relevant for ara + play_vars = play._variable_manager.get_vars(play=play)["vars"] + if "ara_playbook_name" in play_vars: + self._submit_thread("global", self._set_playbook_name, play_vars["ara_playbook_name"]) + + labels = self.default_labels + self.argument_labels + if "ara_playbook_labels" in play_vars: + # ara_playbook_labels can be supplied as a list inside a playbook + # but it might also be specified as a comma separated string when + # using extra-vars + if isinstance(play_vars["ara_playbook_labels"], list): + labels.extend(play_vars["ara_playbook_labels"]) + elif isinstance(play_vars["ara_playbook_labels"], str): + labels.extend(play_vars["ara_playbook_labels"].split(",")) + else: + raise TypeError("ara_playbook_labels must be a list or a comma-separated string") + if labels: + self._submit_thread("global", self._set_playbook_labels, labels) + + # Record all the files involved in the play + for path in play._loader._FILE_CACHE.keys(): + self._submit_thread("global", self._get_or_create_file, path) + + # Note: ansible-runner suffixes play UUIDs when running in serial so 34cff6f4-9f8e-6137-3461-000000000005 can + # end up being 34cff6f4-9f8e-6137-3461-000000000005_2. Remove anything beyond standard 36 character UUIDs. + # https://github.com/ansible-community/ara/issues/211 + # Create the play + self.play = self.client.post( + "/api/v1/plays", + name=play.name, + status="running", + uuid=play._uuid[:36], + playbook=self.playbook["id"], + started=datetime.datetime.now(datetime.timezone.utc).isoformat(), + ) + + return self.play + + def v2_playbook_on_handler_task_start(self, task): + self.log.debug("v2_playbook_on_handler_task_start") + # TODO: Why doesn't `v2_playbook_on_handler_task_start` have is_conditional ? + self.v2_playbook_on_task_start(task, False, handler=True) + + def v2_playbook_on_task_start(self, task, is_conditional, handler=False): + self.log.debug("v2_playbook_on_task_start") + self._end_task() + + if self.callback_threads: + self.task_threads = ThreadPoolExecutor(max_workers=self.callback_threads) + self.log.debug("Task thread pool initialized with %s thread(s)" % self.callback_threads) + + pathspec = task.get_path() + if pathspec: + path, lineno = pathspec.split(":", 1) + lineno = int(lineno) + else: + # Task doesn't have a path, default to "something" + path = self.playbook["path"] + lineno = 1 + + # Get task file + task_file = self._get_or_create_file(path) + + # Get task + self.task = self._get_or_create_task(task, task_file["id"], lineno, handler) + + return self.task + + def v2_runner_on_start(self, host, task): + # v2_runner_on_start was added in 2.8 so this doesn't get run for Ansible 2.7 and below. + self.result_started[host.get_name()] = datetime.datetime.now(datetime.timezone.utc).isoformat() + + def v2_runner_on_ok(self, result, **kwargs): + self._submit_thread("task", self._load_result, result, "ok", **kwargs) + + def v2_runner_on_unreachable(self, result, **kwargs): + self._submit_thread("task", self._load_result, result, "unreachable", **kwargs) + + def v2_runner_on_failed(self, result, **kwargs): + self._submit_thread("task", self._load_result, result, "failed", **kwargs) + + def v2_runner_on_skipped(self, result, **kwargs): + self._submit_thread("task", self._load_result, result, "skipped", **kwargs) + + def v2_runner_item_on_ok(self, result): + self._update_delegation_cache(result) + + def v2_runner_item_on_failed(self, result): + self._update_delegation_cache(result) + + def v2_runner_item_on_skipped(self, result): + pass + # result._task.delegate_to can end up being a variable from this hook, don't save it. + # https://github.com/ansible/ansible/issues/75339 + # self._update_delegation_cache(result) + + def v2_playbook_on_stats(self, stats): + self.log.debug("v2_playbook_on_stats") + self._end_task() + self._end_play() + self._load_stats(stats) + self._end_playbook(stats) + + def _end_task(self): + if self.task is not None: + self._submit_thread( + "task", + self.client.patch, + "/api/v1/tasks/%s" % self.task["id"], + status="completed", + ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), + ) + if self.callback_threads: + # Flush threads before moving on to next task to make sure all results are saved + self.log.debug("waiting for task threads...") + self.task_threads.shutdown(wait=True) + self.task_threads = None + self.task = None + + def _end_play(self): + if self.play is not None: + self._submit_thread( + "global", + self.client.patch, + "/api/v1/plays/%s" % self.play["id"], + status="completed", + ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), + ) + self.play = None + + def _end_playbook(self, stats): + status = "unknown" + if len(stats.failures) >= 1 or len(stats.dark) >= 1: + status = "failed" + else: + status = "completed" + + self._submit_thread( + "global", + self.client.patch, + "/api/v1/playbooks/%s" % self.playbook["id"], + status=status, + ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), + ) + + if self.callback_threads: + self.log.debug("waiting for global threads...") + self.global_threads.shutdown(wait=True) + + def _set_playbook_name(self, name): + if self.playbook["name"] != name: + self.playbook = self.client.patch("/api/v1/playbooks/%s" % self.playbook["id"], name=name) + + def _set_playbook_labels(self, labels): + # Only update labels if our cache doesn't match + current_labels = [label["name"] for label in self.playbook["labels"]] + if sorted(current_labels) != sorted(labels): + self.log.debug("Updating playbook labels to match: %s" % ",".join(labels)) + self.playbook = self.client.patch("/api/v1/playbooks/%s" % self.playbook["id"], labels=labels) + + def _get_or_create_file(self, path, content=None): + if path not in self.file_cache: + self.log.debug("File not in cache, getting or creating: %s" % path) + for ignored_file_pattern in self.ignored_files: + if ignored_file_pattern in path: + self.log.debug("Ignoring file {1}, matched pattern: {0}".format(ignored_file_pattern, path)) + content = "Not saved by ARA as configured by 'ignored_files'" + if content is None: + try: + with open(path, "r") as fd: + content = fd.read() + except IOError as e: + self.log.error("Unable to open {0} for reading: {1}".format(path, str(e))) + content = """ARA was not able to read this file successfully. + Refer to the logs for more information""" + + self.file_cache[path] = self.client.post( + "/api/v1/files", playbook=self.playbook["id"], path=path, content=content + ) + + return self.file_cache[path] + + def _get_or_create_host(self, host): + # Note: The get_or_create is handled through the serializer of the API server. + if host not in self.host_cache: + self.log.debug("Host not in cache, getting or creating: %s" % host) + self.host_cache[host] = self.client.post("/api/v1/hosts", name=host, playbook=self.playbook["id"]) + return self.host_cache[host] + + def _get_or_create_task(self, task, file_id=None, lineno=None, handler=None): + # Note: The get_or_create is handled through the serializer of the API server. + task_uuid = str(task._uuid)[:36] + if task_uuid not in self.task_cache: + if None in (file_id, lineno, handler): + raise ValueError("file_id, lineno, and handler are required to create a task") + + self.log.debug("Task not in cache, getting or creating: %s" % task) + self.task_cache[task_uuid] = self.client.post( + "/api/v1/tasks", + name=task.get_name(), + status="running", + action=task.action, + play=self.play["id"], + playbook=self.playbook["id"], + file=file_id, + tags=task.tags, + lineno=lineno, + handler=handler, + started=datetime.datetime.now(datetime.timezone.utc).isoformat(), + ) + + return self.task_cache[task_uuid] + + def _update_delegation_cache(self, result): + # If the task is a loop and delegate_to is a variable, result._task.delegate_to can return the variable + # instead of it's value when using the v2_runner_on_* hooks. + # We're caching the actual host names here from v2_runner_item_on_* hooks. + # https://github.com/ansible/ansible/issues/75339 + if result._task.delegate_to: + task_uuid = str(result._task._uuid[:36]) + if task_uuid not in self.delegation_cache: + self.delegation_cache[task_uuid] = [] + self.delegation_cache[task_uuid].append(result._task.delegate_to) + + def _load_result(self, result, status, **kwargs): + """ + This method is called when an individual task instance on a single + host completes. It is responsible for logging a single result to the + database. + """ + hostname = result._host.get_name() + self.result_ended[hostname] = datetime.datetime.now(datetime.timezone.utc).isoformat() + + # Retrieve the host so we can associate the result to the host id + host = self._get_or_create_host(hostname) + + # If the task was delegated to another host, retrieve that too. + # Since a single task can be delegated to multiple hosts (ex: looping on a host group and using delegate_to) + # this must be a list of hosts. + delegated_to = [] + # The value of result._task.delegate_to doesn't get templated if the task was skipped + # https://github.com/ansible/ansible/issues/75339#issuecomment-888724838 + if result._task.delegate_to and status != "skipped": + task_uuid = str(result._task._uuid[:36]) + if task_uuid in self.delegation_cache: + for delegated in self.delegation_cache[task_uuid]: + delegated_to.append(self._get_or_create_host(delegated)) + else: + delegated_to.append(self._get_or_create_host(result._task.delegate_to)) + + # Retrieve the task so we can associate the result to the task id + task = self._get_or_create_task(result._task) + + results = strip_internal_keys(module_response_deepcopy(result._result)) + + # Round-trip through JSON to sort keys and convert Ansible types + # to standard types + try: + jsonified = json.dumps(results, cls=AnsibleJSONEncoder, ensure_ascii=False, sort_keys=True) + except TypeError: + # Python 3 can't sort non-homogenous keys. + # https://bugs.python.org/issue25457 + jsonified = json.dumps(results, cls=AnsibleJSONEncoder, ensure_ascii=False, sort_keys=False) + results = json.loads(jsonified) + + # Sanitize facts + if "ansible_facts" in results: + for fact in self.ignored_facts: + if fact in results["ansible_facts"]: + self.log.debug("Ignoring fact: %s" % fact) + results["ansible_facts"][fact] = "Not saved by ARA as configured by 'ignored_facts'" + + self.result = self.client.post( + "/api/v1/results", + playbook=self.playbook["id"], + task=task["id"], + host=host["id"], + delegated_to=[h["id"] for h in delegated_to], + play=task["play"], + content=results, + status=status, + started=self.result_started[hostname] if hostname in self.result_started else task["started"], + ended=self.result_ended[hostname], + changed=result._result.get("changed", False), + # Note: ignore_errors might be None instead of a boolean + ignore_errors=kwargs.get("ignore_errors", False) or False, + ) + + if task["action"] in ["setup", "gather_facts"] and "ansible_facts" in results: + self.client.patch("/api/v1/hosts/%s" % host["id"], facts=results["ansible_facts"]) + + def _load_stats(self, stats): + hosts = sorted(stats.processed.keys()) + for hostname in hosts: + host = self._get_or_create_host(hostname) + host_stats = stats.summarize(hostname) + + self._submit_thread( + "global", + self.client.patch, + "/api/v1/hosts/%s" % host["id"], + changed=host_stats["changed"], + unreachable=host_stats["unreachable"], + failed=host_stats["failures"], + ok=host_stats["ok"], + skipped=host_stats["skipped"], + )