community.crypto/plugins/modules/certificate_complete_chain.py

365 lines
13 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: certificate_complete_chain
author: "Felix Fontein (@felixfontein)"
short_description: Complete certificate chain given a set of untrusted and root certificates
description:
- "This module completes a given chain of certificates in PEM format by finding
intermediate certificates from a given set of certificates, until it finds a root
certificate in another given set of certificates."
- "This can for example be used to find the root certificate for a certificate chain
returned by M(community.crypto.acme_certificate)."
- "Note that this module does I(not) check for validity of the chains. It only
checks that issuer and subject match, and that the signature is correct. It
ignores validity dates and key usage completely. If you need to verify that a
generated chain is valid, please use C(openssl verify ...)."
requirements:
- "cryptography >= 1.5"
options:
input_chain:
description:
- A concatenated set of certificates in PEM format forming a chain.
- The module will try to complete this chain.
type: str
required: yes
root_certificates:
description:
- "A list of filenames or directories."
- "A filename is assumed to point to a file containing one or more certificates
in PEM format. All certificates in this file will be added to the set of
root certificates."
- "If a directory name is given, all files in the directory and its
subdirectories will be scanned and tried to be parsed as concatenated
certificates in PEM format."
- "Symbolic links will be followed."
type: list
elements: path
required: yes
intermediate_certificates:
description:
- "A list of filenames or directories."
- "A filename is assumed to point to a file containing one or more certificates
in PEM format. All certificates in this file will be added to the set of
root certificates."
- "If a directory name is given, all files in the directory and its
subdirectories will be scanned and tried to be parsed as concatenated
certificates in PEM format."
- "Symbolic links will be followed."
type: list
elements: path
default: []
'''
EXAMPLES = '''
# Given a leaf certificate for www.ansible.com and one or more intermediate
# certificates, finds the associated root certificate.
- name: Find root certificate
community.crypto.certificate_complete_chain:
input_chain: "{{ lookup('file', '/etc/ssl/csr/www.ansible.com-fullchain.pem') }}"
root_certificates:
- /etc/ca-certificates/
register: www_ansible_com
- name: Write root certificate to disk
copy:
dest: /etc/ssl/csr/www.ansible.com-root.pem
content: "{{ www_ansible_com.root }}"
# Given a leaf certificate for www.ansible.com, and a list of intermediate
# certificates, finds the associated root certificate.
- name: Find root certificate
community.crypto.certificate_complete_chain:
input_chain: "{{ lookup('file', '/etc/ssl/csr/www.ansible.com.pem') }}"
intermediate_certificates:
- /etc/ssl/csr/www.ansible.com-chain.pem
root_certificates:
- /etc/ca-certificates/
register: www_ansible_com
- name: Write complete chain to disk
copy:
dest: /etc/ssl/csr/www.ansible.com-completechain.pem
content: "{{ ''.join(www_ansible_com.complete_chain) }}"
- name: Write root chain (intermediates and root) to disk
copy:
dest: /etc/ssl/csr/www.ansible.com-rootchain.pem
content: "{{ ''.join(www_ansible_com.chain) }}"
'''
RETURN = '''
root:
description:
- "The root certificate in PEM format."
returned: success
type: str
chain:
description:
- "The chain added to the given input chain. Includes the root certificate."
- "Returned as a list of PEM certificates."
returned: success
type: list
elements: str
complete_chain:
description:
- "The completed chain, including leaf, all intermediates, and root."
- "Returned as a list of PEM certificates."
returned: success
type: list
elements: str
'''
import os
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
split_pem_list,
)
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
import cryptography.exceptions
import cryptography.hazmat.backends
import cryptography.hazmat.primitives.serialization
import cryptography.hazmat.primitives.asymmetric.rsa
import cryptography.hazmat.primitives.asymmetric.ec
import cryptography.hazmat.primitives.asymmetric.padding
import cryptography.hazmat.primitives.hashes
import cryptography.hazmat.primitives.asymmetric.utils
import cryptography.x509
import cryptography.x509.oid
HAS_CRYPTOGRAPHY = (LooseVersion(cryptography.__version__) >= LooseVersion('1.5'))
_cryptography_backend = cryptography.hazmat.backends.default_backend()
except ImportError as dummy:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
HAS_CRYPTOGRAPHY = False
class Certificate(object):
'''
Stores PEM with parsed certificate.
'''
def __init__(self, pem, cert):
if not (pem.endswith('\n') or pem.endswith('\r')):
pem = pem + '\n'
self.pem = pem
self.cert = cert
def is_parent(module, cert, potential_parent):
'''
Tests whether the given certificate has been issued by the potential parent certificate.
'''
# Check issuer
if cert.cert.issuer != potential_parent.cert.subject:
return False
# Check signature
public_key = potential_parent.cert.public_key()
try:
if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey):
public_key.verify(
cert.cert.signature,
cert.cert.tbs_certificate_bytes,
cryptography.hazmat.primitives.asymmetric.padding.PKCS1v15(),
cert.cert.signature_hash_algorithm
)
elif isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
public_key.verify(
cert.cert.signature,
cert.cert.tbs_certificate_bytes,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cert.cert.signature_hash_algorithm),
)
else:
# Unknown public key type
module.warn('Unknown public key type "{0}"'.format(public_key))
return False
return True
except cryptography.exceptions.InvalidSignature as dummy:
return False
except cryptography.exceptions.UnsupportedAlgorithm as dummy:
module.warn('Unsupported algorithm "{0}"'.format(cert.cert.signature_hash_algorithm))
return False
except Exception as e:
module.fail_json(msg='Unknown error on signature validation: {0}'.format(e))
def parse_PEM_list(module, text, source, fail_on_error=True):
'''
Parse concatenated PEM certificates. Return list of ``Certificate`` objects.
'''
result = []
for cert_pem in split_pem_list(text):
# Try to load PEM certificate
try:
cert = cryptography.x509.load_pem_x509_certificate(to_bytes(cert_pem), _cryptography_backend)
result.append(Certificate(cert_pem, cert))
except Exception as e:
msg = 'Cannot parse certificate #{0} from {1}: {2}'.format(len(result) + 1, source, e)
if fail_on_error:
module.fail_json(msg=msg)
else:
module.warn(msg)
return result
def load_PEM_list(module, path, fail_on_error=True):
'''
Load concatenated PEM certificates from file. Return list of ``Certificate`` objects.
'''
try:
with open(path, "rb") as f:
return parse_PEM_list(module, f.read().decode('utf-8'), source=path, fail_on_error=fail_on_error)
except Exception as e:
msg = 'Cannot read certificate file {0}: {1}'.format(path, e)
if fail_on_error:
module.fail_json(msg=msg)
else:
module.warn(msg)
return []
class CertificateSet(object):
'''
Stores a set of certificates. Allows to search for parent (issuer of a certificate).
'''
def __init__(self, module):
self.module = module
self.certificates = set()
self.certificates_by_issuer = dict()
self.certificate_by_cert = dict()
def _load_file(self, path):
certs = load_PEM_list(self.module, path, fail_on_error=False)
for cert in certs:
self.certificates.add(cert)
if cert.cert.subject not in self.certificates_by_issuer:
self.certificates_by_issuer[cert.cert.subject] = []
self.certificates_by_issuer[cert.cert.subject].append(cert)
self.certificate_by_cert[cert.cert] = cert
def load(self, path):
'''
Load lists of PEM certificates from a file or a directory.
'''
b_path = to_bytes(path, errors='surrogate_or_strict')
if os.path.isdir(b_path):
for directory, dummy, files in os.walk(b_path, followlinks=True):
for file in files:
self._load_file(os.path.join(directory, file))
else:
self._load_file(b_path)
def find_parent(self, cert):
'''
Search for the parent (issuer) of a certificate. Return ``None`` if none was found.
'''
potential_parents = self.certificates_by_issuer.get(cert.cert.issuer, [])
for potential_parent in potential_parents:
if is_parent(self.module, cert, potential_parent):
return potential_parent
return None
def format_cert(cert):
'''
Return human readable representation of certificate for error messages.
'''
return str(cert.cert)
def check_cycle(module, occured_certificates, next):
'''
Make sure that next is not in occured_certificates so far, and add it.
'''
next_cert = next.cert
if next_cert in occured_certificates:
module.fail_json(msg='Found cycle while building certificate chain')
occured_certificates.add(next_cert)
def main():
module = AnsibleModule(
argument_spec=dict(
input_chain=dict(type='str', required=True),
root_certificates=dict(type='list', required=True, elements='path'),
intermediate_certificates=dict(type='list', default=[], elements='path'),
),
supports_check_mode=True,
)
if not HAS_CRYPTOGRAPHY:
module.fail_json(msg=missing_required_lib('cryptography >= 1.5'), exception=CRYPTOGRAPHY_IMP_ERR)
# Load chain
chain = parse_PEM_list(module, module.params['input_chain'], source='input chain')
if len(chain) == 0:
module.fail_json(msg='Input chain must contain at least one certificate')
# Check chain
for i, parent in enumerate(chain):
if i > 0:
if not is_parent(module, chain[i - 1], parent):
module.fail_json(msg=('Cannot verify input chain: certificate #{2}: {3} is not issuer ' +
'of certificate #{0}: {1}').format(i, format_cert(chain[i - 1]), i + 1, format_cert(parent)))
# Load intermediate certificates
intermediates = CertificateSet(module)
for path in module.params['intermediate_certificates']:
intermediates.load(path)
# Load root certificates
roots = CertificateSet(module)
for path in module.params['root_certificates']:
roots.load(path)
# Try to complete chain
current = chain[-1]
completed = []
occured_certificates = set([cert.cert for cert in chain])
if current.cert in roots.certificate_by_cert:
# Do not try to complete the chain when it's already ending with a root certificate
current = None
while current:
root = roots.find_parent(current)
if root:
check_cycle(module, occured_certificates, root)
completed.append(root)
break
intermediate = intermediates.find_parent(current)
if intermediate:
check_cycle(module, occured_certificates, intermediate)
completed.append(intermediate)
current = intermediate
else:
module.fail_json(msg='Cannot complete chain. Stuck at certificate {0}'.format(format_cert(current)))
# Return results
complete_chain = chain + completed
module.exit_json(
changed=False,
root=complete_chain[-1].pem,
chain=[cert.pem for cert in completed],
complete_chain=[cert.pem for cert in complete_chain],
)
if __name__ == "__main__":
main()