# Copyright (c) 2018 Red Hat, Inc. # # This file is part of ARA: Ansible Run Analysis. # # ARA is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # ARA is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with ARA. If not, see . from __future__ import absolute_import, division, print_function import datetime import json import logging import os import socket from concurrent.futures import ThreadPoolExecutor from ansible import __version__ as ansible_version from ansible.parsing.ajson import AnsibleJSONEncoder from ansible.plugins.callback import CallbackBase from ansible.vars.clean import module_response_deepcopy, strip_internal_keys from ara.clients import utils as client_utils # Ansible CLI options are now in ansible.context in >= 2.8 # https://github.com/ansible/ansible/commit/afdbb0d9d5bebb91f632f0d4a1364de5393ba17a try: from ansible import context cli_options = {key: value for key, value in context.CLIARGS.items()} except ImportError: # < 2.8 doesn't have ansible.context try: from __main__ import cli cli_options = cli.options.__dict__ except ImportError: # using API without CLI cli_options = {} DOCUMENTATION = """ callback: ara callback_type: notification requirements: - ara short_description: Sends playbook execution data to the ARA API internally or over HTTP description: - Sends playbook execution data to the ARA API internally or over HTTP options: api_client: description: The client to use for communicating with the API default: offline env: - name: ARA_API_CLIENT ini: - section: ara key: api_client choices: ['offline', 'http'] api_server: description: When using the HTTP client, the base URL to the ARA API server default: http://127.0.0.1:8000 env: - name: ARA_API_SERVER ini: - section: ara key: api_server api_username: description: If authentication is required, the username to authenticate with default: null env: - name: ARA_API_USERNAME ini: - section: ara key: api_username api_password: description: If authentication is required, the password to authenticate with default: null env: - name: ARA_API_PASSWORD ini: - section: ara key: api_password api_insecure: description: Can be enabled to ignore SSL certification of the API server type: bool default: false env: - name: ARA_API_INSECURE ini: - section: ara key: api_insecure api_timeout: description: Timeout, in seconds, before giving up on HTTP requests type: integer default: 30 env: - name: ARA_API_TIMEOUT ini: - section: ara key: api_timeout argument_labels: description: | A list of CLI arguments that, if set, will be automatically applied to playbooks as labels. Note that CLI arguments are not always named the same as how they are represented by Ansible. For example, --limit is "subset", --user is "remote_user" but --check is "check". type: list default: - remote_user - check - tags - skip_tags - subset env: - name: ARA_ARGUMENT_LABELS ini: - section: ara key: argument_labels default_labels: description: A list of default labels that will be applied to playbooks type: list default: [] env: - name: ARA_DEFAULT_LABELS ini: - section: ara key: default_labels ignored_facts: description: List of host facts that will not be saved by ARA type: list default: ["ansible_env"] env: - name: ARA_IGNORED_FACTS ini: - section: ara key: ignored_facts ignored_arguments: description: List of Ansible arguments that will not be saved by ARA type: list default: ["extra_vars"] env: - name: ARA_IGNORED_ARGUMENTS ini: - section: ara key: ignored_arguments ignored_files: description: List of patterns that will not be saved by ARA type: list default: [] env: - name: ARA_IGNORED_FILES ini: - section: ara key: ignored_files callback_threads: description: - The number of threads to use in API client thread pools - When set to 0, no threading will be used (default) which is appropriate for usage with sqlite - Using threads is recommended when the server is using MySQL or PostgreSQL type: integer default: 0 env: - name: ARA_CALLBACK_THREADS ini: - section: ara key: callback_threads """ class CallbackModule(CallbackBase): """ Saves data from an Ansible run into a database """ CALLBACK_VERSION = 2.0 CALLBACK_TYPE = "awesome" CALLBACK_NAME = "ara_default" def __init__(self): super(CallbackModule, self).__init__() self.log = logging.getLogger("ara.plugins.callback.default") # These are configured in self.set_options self.client = None self.callback_threads = None self.ignored_facts = [] self.ignored_arguments = [] self.ignored_files = [] self.result = None self.result_started = {} self.result_ended = {} self.task = None self.play = None self.playbook = None self.stats = None self.file_cache = {} self.host_cache = {} self.task_cache = {} self.delegation_cache = {} def set_options(self, task_keys=None, var_options=None, direct=None): super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct) self.argument_labels = self.get_option("argument_labels") self.default_labels = self.get_option("default_labels") self.ignored_facts = self.get_option("ignored_facts") self.ignored_arguments = self.get_option("ignored_arguments") self.ignored_files = self.get_option("ignored_files") client = self.get_option("api_client") endpoint = self.get_option("api_server") timeout = self.get_option("api_timeout") username = self.get_option("api_username") password = self.get_option("api_password") insecure = self.get_option("api_insecure") self.client = client_utils.get_client( client=client, endpoint=endpoint, timeout=timeout, username=username, password=password, verify=False if insecure else True, ) # TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter. # In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE. # Otherwise we can hit "urllib3.connectionpool: Connection pool is full" self.callback_threads = self.get_option("callback_threads") if self.callback_threads > 4: self.callback_threads = 4 def _submit_thread(self, threadpool, func, *args, **kwargs): # Manages whether or not the function should be threaded to keep things DRY if self.callback_threads: # Pick from one of two thread pools (global or task) threads = getattr(self, threadpool + "_threads") threads.submit(func, *args, **kwargs) else: func(*args, **kwargs) def v2_playbook_on_start(self, playbook): self.log.debug("v2_playbook_on_start") if self.callback_threads: self.global_threads = ThreadPoolExecutor(max_workers=self.callback_threads) self.log.debug("Global thread pool initialized with %s thread(s)" % self.callback_threads) content = None if playbook._file_name == "__adhoc_playbook__": content = cli_options["module_name"] if cli_options["module_args"]: content = "{0}: {1}".format(content, cli_options["module_args"]) path = "Ad-Hoc: {0}".format(content) else: path = os.path.abspath(playbook._file_name) # Potentially sanitize some user-specified keys for argument in self.ignored_arguments: if argument in cli_options: self.log.debug("Ignoring argument: %s" % argument) cli_options[argument] = "Not saved by ARA as configured by 'ignored_arguments'" # Retrieve and format CLI options for argument labels argument_labels = [] for label in self.argument_labels: if label in cli_options: # Some arguments are lists or tuples if isinstance(cli_options[label], tuple) or isinstance(cli_options[label], list): # Only label these if they're not empty if cli_options[label]: argument_labels.append("%s:%s" % (label, ",".join(cli_options[label]))) # Some arguments are booleans elif isinstance(cli_options[label], bool): argument_labels.append("%s:%s" % (label, cli_options[label])) # The rest can be printed as-is if there is something set elif cli_options[label]: argument_labels.append("%s:%s" % (label, cli_options[label])) self.argument_labels = argument_labels # Create the playbook self.playbook = self.client.post( "/api/v1/playbooks", ansible_version=ansible_version, arguments=cli_options, status="running", path=path, controller=socket.getfqdn(), started=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) # Record the playbook file self._submit_thread("global", self._get_or_create_file, path, content) return self.playbook def v2_playbook_on_play_start(self, play): self.log.debug("v2_playbook_on_play_start") self._end_task() self._end_play() # Load variables to verify if there is anything relevant for ara play_vars = play._variable_manager.get_vars(play=play)["vars"] if "ara_playbook_name" in play_vars: self._submit_thread("global", self._set_playbook_name, play_vars["ara_playbook_name"]) labels = self.default_labels + self.argument_labels if "ara_playbook_labels" in play_vars: # ara_playbook_labels can be supplied as a list inside a playbook # but it might also be specified as a comma separated string when # using extra-vars if isinstance(play_vars["ara_playbook_labels"], list): labels.extend(play_vars["ara_playbook_labels"]) elif isinstance(play_vars["ara_playbook_labels"], str): labels.extend(play_vars["ara_playbook_labels"].split(",")) else: raise TypeError("ara_playbook_labels must be a list or a comma-separated string") if labels: self._submit_thread("global", self._set_playbook_labels, labels) # Record all the files involved in the play for path in play._loader._FILE_CACHE.keys(): self._submit_thread("global", self._get_or_create_file, path) # Note: ansible-runner suffixes play UUIDs when running in serial so 34cff6f4-9f8e-6137-3461-000000000005 can # end up being 34cff6f4-9f8e-6137-3461-000000000005_2. Remove anything beyond standard 36 character UUIDs. # https://github.com/ansible-community/ara/issues/211 # Create the play self.play = self.client.post( "/api/v1/plays", name=play.name, status="running", uuid=play._uuid[:36], playbook=self.playbook["id"], started=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) return self.play def v2_playbook_on_handler_task_start(self, task): self.log.debug("v2_playbook_on_handler_task_start") # TODO: Why doesn't `v2_playbook_on_handler_task_start` have is_conditional ? self.v2_playbook_on_task_start(task, False, handler=True) def v2_playbook_on_task_start(self, task, is_conditional, handler=False): self.log.debug("v2_playbook_on_task_start") self._end_task() if self.callback_threads: self.task_threads = ThreadPoolExecutor(max_workers=self.callback_threads) self.log.debug("Task thread pool initialized with %s thread(s)" % self.callback_threads) pathspec = task.get_path() if pathspec: path, lineno = pathspec.split(":", 1) lineno = int(lineno) else: # Task doesn't have a path, default to "something" path = self.playbook["path"] lineno = 1 # Get task file task_file = self._get_or_create_file(path) # Get task self.task = self._get_or_create_task(task, task_file["id"], lineno, handler) return self.task def v2_runner_on_start(self, host, task): # v2_runner_on_start was added in 2.8 so this doesn't get run for Ansible 2.7 and below. self.result_started[host.get_name()] = datetime.datetime.now(datetime.timezone.utc).isoformat() def v2_runner_on_ok(self, result, **kwargs): self._submit_thread("task", self._load_result, result, "ok", **kwargs) def v2_runner_on_unreachable(self, result, **kwargs): self._submit_thread("task", self._load_result, result, "unreachable", **kwargs) def v2_runner_on_failed(self, result, **kwargs): self._submit_thread("task", self._load_result, result, "failed", **kwargs) def v2_runner_on_skipped(self, result, **kwargs): self._submit_thread("task", self._load_result, result, "skipped", **kwargs) def v2_runner_item_on_ok(self, result): self._update_delegation_cache(result) def v2_runner_item_on_failed(self, result): self._update_delegation_cache(result) def v2_runner_item_on_skipped(self, result): pass # result._task.delegate_to can end up being a variable from this hook, don't save it. # https://github.com/ansible/ansible/issues/75339 # self._update_delegation_cache(result) def v2_playbook_on_stats(self, stats): self.log.debug("v2_playbook_on_stats") self._end_task() self._end_play() self._load_stats(stats) self._end_playbook(stats) def _end_task(self): if self.task is not None: self._submit_thread( "task", self.client.patch, "/api/v1/tasks/%s" % self.task["id"], status="completed", ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) if self.callback_threads: # Flush threads before moving on to next task to make sure all results are saved self.log.debug("waiting for task threads...") self.task_threads.shutdown(wait=True) self.task_threads = None self.task = None def _end_play(self): if self.play is not None: self._submit_thread( "global", self.client.patch, "/api/v1/plays/%s" % self.play["id"], status="completed", ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) self.play = None def _end_playbook(self, stats): status = "unknown" if len(stats.failures) >= 1 or len(stats.dark) >= 1: status = "failed" else: status = "completed" self._submit_thread( "global", self.client.patch, "/api/v1/playbooks/%s" % self.playbook["id"], status=status, ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) if self.callback_threads: self.log.debug("waiting for global threads...") self.global_threads.shutdown(wait=True) def _set_playbook_name(self, name): if self.playbook["name"] != name: self.playbook = self.client.patch("/api/v1/playbooks/%s" % self.playbook["id"], name=name) def _set_playbook_labels(self, labels): # Only update labels if our cache doesn't match current_labels = [label["name"] for label in self.playbook["labels"]] if sorted(current_labels) != sorted(labels): self.log.debug("Updating playbook labels to match: %s" % ",".join(labels)) self.playbook = self.client.patch("/api/v1/playbooks/%s" % self.playbook["id"], labels=labels) def _get_or_create_file(self, path, content=None): if path not in self.file_cache: self.log.debug("File not in cache, getting or creating: %s" % path) for ignored_file_pattern in self.ignored_files: if ignored_file_pattern in path: self.log.debug("Ignoring file {1}, matched pattern: {0}".format(ignored_file_pattern, path)) content = "Not saved by ARA as configured by 'ignored_files'" if content is None: try: with open(path, "r") as fd: content = fd.read() except IOError as e: self.log.error("Unable to open {0} for reading: {1}".format(path, str(e))) content = """ARA was not able to read this file successfully. Refer to the logs for more information""" self.file_cache[path] = self.client.post( "/api/v1/files", playbook=self.playbook["id"], path=path, content=content ) return self.file_cache[path] def _get_or_create_host(self, host): # Note: The get_or_create is handled through the serializer of the API server. if host not in self.host_cache: self.log.debug("Host not in cache, getting or creating: %s" % host) self.host_cache[host] = self.client.post("/api/v1/hosts", name=host, playbook=self.playbook["id"]) return self.host_cache[host] def _get_or_create_task(self, task, file_id=None, lineno=None, handler=None): # Note: The get_or_create is handled through the serializer of the API server. task_uuid = str(task._uuid)[:36] if task_uuid not in self.task_cache: if None in (file_id, lineno, handler): raise ValueError("file_id, lineno, and handler are required to create a task") self.log.debug("Task not in cache, getting or creating: %s" % task) self.task_cache[task_uuid] = self.client.post( "/api/v1/tasks", name=task.get_name(), status="running", action=task.action, play=self.play["id"], playbook=self.playbook["id"], file=file_id, tags=task.tags, lineno=lineno, handler=handler, started=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) return self.task_cache[task_uuid] def _update_delegation_cache(self, result): # If the task is a loop and delegate_to is a variable, result._task.delegate_to can return the variable # instead of it's value when using the v2_runner_on_* hooks. # We're caching the actual host names here from v2_runner_item_on_* hooks. # https://github.com/ansible/ansible/issues/75339 if result._task.delegate_to: task_uuid = str(result._task._uuid[:36]) if task_uuid not in self.delegation_cache: self.delegation_cache[task_uuid] = [] self.delegation_cache[task_uuid].append(result._task.delegate_to) def _load_result(self, result, status, **kwargs): """ This method is called when an individual task instance on a single host completes. It is responsible for logging a single result to the database. """ hostname = result._host.get_name() self.result_ended[hostname] = datetime.datetime.now(datetime.timezone.utc).isoformat() # Retrieve the host so we can associate the result to the host id host = self._get_or_create_host(hostname) # If the task was delegated to another host, retrieve that too. # Since a single task can be delegated to multiple hosts (ex: looping on a host group and using delegate_to) # this must be a list of hosts. delegated_to = [] # The value of result._task.delegate_to doesn't get templated if the task was skipped # https://github.com/ansible/ansible/issues/75339#issuecomment-888724838 if result._task.delegate_to and status != "skipped": task_uuid = str(result._task._uuid[:36]) if task_uuid in self.delegation_cache: for delegated in self.delegation_cache[task_uuid]: delegated_to.append(self._get_or_create_host(delegated)) else: delegated_to.append(self._get_or_create_host(result._task.delegate_to)) # Retrieve the task so we can associate the result to the task id task = self._get_or_create_task(result._task) results = strip_internal_keys(module_response_deepcopy(result._result)) # Round-trip through JSON to sort keys and convert Ansible types # to standard types try: jsonified = json.dumps(results, cls=AnsibleJSONEncoder, ensure_ascii=False, sort_keys=True) except TypeError: # Python 3 can't sort non-homogenous keys. # https://bugs.python.org/issue25457 jsonified = json.dumps(results, cls=AnsibleJSONEncoder, ensure_ascii=False, sort_keys=False) results = json.loads(jsonified) # Sanitize facts if "ansible_facts" in results: for fact in self.ignored_facts: if fact in results["ansible_facts"]: self.log.debug("Ignoring fact: %s" % fact) results["ansible_facts"][fact] = "Not saved by ARA as configured by 'ignored_facts'" self.result = self.client.post( "/api/v1/results", playbook=self.playbook["id"], task=task["id"], host=host["id"], delegated_to=[h["id"] for h in delegated_to], play=task["play"], content=results, status=status, started=self.result_started[hostname] if hostname in self.result_started else task["started"], ended=self.result_ended[hostname], changed=result._result.get("changed", False), # Note: ignore_errors might be None instead of a boolean ignore_errors=kwargs.get("ignore_errors", False) or False, ) if task["action"] in ["setup", "gather_facts"] and "ansible_facts" in results: self.client.patch("/api/v1/hosts/%s" % host["id"], facts=results["ansible_facts"]) def _load_stats(self, stats): hosts = sorted(stats.processed.keys()) for hostname in hosts: host = self._get_or_create_host(hostname) host_stats = stats.summarize(hostname) self._submit_thread( "global", self.client.patch, "/api/v1/hosts/%s" % host["id"], changed=host_stats["changed"], unreachable=host_stats["unreachable"], failed=host_stats["failures"], ok=host_stats["ok"], skipped=host_stats["skipped"], )