ansible-keepass/plugins/lookup/keepass.py

397 lines
14 KiB
Python

__metaclass__ = type
import argparse
import getpass
import hashlib
import os
import re
import socket
import subprocess
import sys
import tempfile
import time
import traceback
from ansible.errors import AnsibleError
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
from pykeepass import PyKeePass
from pykeepass.exceptions import CredentialsError
DOCUMENTATION = """
lookup: keepass
author: Victor Zemtsov <viczem.dev@gmail.com>
version_added: '0.5.1'
short_description: Fetching data from KeePass file
description:
- This lookup returns a value of a property of a KeePass entry
- which fetched by given path
options:
_terms:
description:
- first is a path to KeePass entry
- second is a property name of the entry, e.g. username or password
required: True
notes:
- https://github.com/viczem/ansible-keepass
examples:
- "{{ lookup('keepass', 'path/to/entry', 'username') }}"
- "{{ lookup('keepass', 'path/to/entry', 'password') }}"
- "{{ lookup('keepass', 'path/to/entry', 'custom_properties', 'my_prop_name') }}"
"""
display = Display()
class LookupModule(LookupBase):
keepass = None
def _var(self, var_value):
return self._templar.template(var_value, fail_on_undefined=True)
def run(self, terms, variables=None, **kwargs):
if not terms:
raise AnsibleError("KeePass: arguments is not set")
if not all(isinstance(_, str) for _ in terms):
raise AnsibleError("KeePass: invalid argument type, all must be string")
if variables is not None:
self._templar.available_variables = variables
variables_ = getattr(self._templar, "_available_variables", {})
# Check keepass database file (required)
var_dbx = self._var(variables_.get("keepass_dbx", ""))
if not var_dbx:
raise AnsibleError("KeePass: 'keepass_dbx' is not set")
var_dbx = os.path.realpath(os.path.expanduser(os.path.expandvars(var_dbx)))
if not os.path.isfile(var_dbx):
raise AnsibleError("KeePass: '%s' is not found" % var_dbx)
# Check key file (optional)
var_key = self._var(variables_.get("keepass_key", ""))
if var_key:
var_key = os.path.realpath(os.path.expanduser(os.path.expandvars(var_key)))
if not os.path.isfile(var_key):
raise AnsibleError("KeePass: '%s' is not found" % var_key)
# Check password (required)
var_psw = self._var(variables_.get("keepass_psw", ""))
if not var_psw:
raise AnsibleError("KeePass: 'keepass_psw' is not set")
# TTL of keepass socket (optional, default: 60 seconds)
var_ttl = self._var(str(variables_.get("keepass_ttl", "60")))
socket_path = _keepass_socket_path(var_dbx)
lock_file_ = socket_path + ".lock"
if not os.path.isfile(lock_file_):
cmd = [
"/usr/bin/env",
"python3",
os.path.abspath(__file__),
var_dbx,
socket_path,
var_ttl,
]
if var_key:
cmd.append("--key=%s" % var_key)
try:
display.v("KeePass: run socket for %s" % var_dbx)
subprocess.Popen(cmd)
except OSError:
os.remove(lock_file_)
raise AnsibleError(traceback.format_exc())
attempts = 10
success = False
for _ in range(attempts):
try:
display.vvv("KeePass: try connect to socket %s/%s" % (_, attempts))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(socket_path)
# send password to the socket for decrypt keepass dbx
display.vvv("KeePass: send password to '%s'" % socket_path)
sock.send(_rq("password", str(var_psw)))
resp = sock.recv(1024).decode().splitlines()
if len(resp) == 2 and resp[0] == "password":
if resp[1] == "0":
success = True
else:
sock.send(_rq("close"))
raise AnsibleError("KeePass: wrong dbx password")
sock.close()
break
except FileNotFoundError:
# wait until the above command open the socket
time.sleep(1)
if not success:
raise AnsibleError("KeePass: socket connection failed for %s" % var_dbx)
display.v("KeePass: open socket for %s -> %s" % (var_dbx, socket_path))
if len(terms) == 1 and terms[0] in ("quit", "exit", "close"):
self._send(socket_path, terms[0], [])
else:
# Fetching data from the keepass socket
return self._send(socket_path, "fetch", terms)
def _send(self, kp_soc, cmd, terms):
display.vvv("KeePass: connect to '%s'" % kp_soc)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(kp_soc)
except FileNotFoundError:
raise AnsibleError("KeePass: '%s' is not found" % kp_soc)
try:
display.vvv("KeePass: %s %s" % (cmd, terms))
sock.send(_rq(cmd, *terms))
resp = sock.recv(1024).decode().splitlines()
resp_len = len(resp)
if resp_len == 0:
raise AnsibleError("KeePass: '%s' result is empty" % cmd)
if resp_len >= 3:
if resp[0] != cmd:
raise AnsibleError(
"KeePass: received command '%s', expected '%s'" % (resp[0], cmd)
)
if resp[1] == "0":
return [os.linesep.join(resp[2:])]
else:
raise AnsibleError("KeePass: '%s' has error '%s'" % (resp[2], cmd))
except Exception as e:
raise AnsibleError(str(e))
finally:
sock.close()
display.vvv("KeePass: disconnect from '%s'" % kp_soc)
def _keepass_socket(kdbx, kdbx_key, sock_path, ttl=60, kdbx_password=None):
"""
:param str kdbx:
:param str kdbx_key:
:param str sock_path:
:param int ttl: in seconds
:return:
Socket messages have multiline format.
First line is a command for both messages are request and response
"""
try:
os.umask(0o177)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind(sock_path)
s.listen(1)
if ttl > 0:
s.settimeout(ttl)
if kdbx_password:
kp = PyKeePass(kdbx, kdbx_password, kdbx_key)
else:
kp = None
is_open = True
while is_open:
conn, addr = s.accept()
with conn:
if ttl > 0:
conn.settimeout(ttl)
while True:
data = conn.recv(1024).decode()
if not data:
break
rq = data.splitlines()
if len(rq) == 0:
conn.send(_resp("", 1, "empty request"))
break
cmd, *arg = rq
arg_len = len(arg)
# CMD: quit | exit | close
if arg_len == 0 and cmd in ("quit", "exit", "close"):
conn.send(_resp(cmd, 0))
conn.close()
is_open = False
break
# CMD: password
if kp is None:
if arg_len == 0:
conn.send(_resp("password", 1))
break
if cmd == "password" and arg[0]:
kp = PyKeePass(kdbx, arg[0], kdbx_key)
conn.send(_resp("password", 0))
break
else:
conn.send(_resp("password", 1))
break
# CMD: fetch
# Read data from decrypted KeePass file
if cmd != "fetch":
conn.send(_resp("fetch", 1, "unknown command '%s'" % cmd))
break
if arg_len == 0:
conn.send(_resp("fetch", 1, "path is not set"))
break
if arg_len == 1:
conn.send(
_resp(
"fetch",
1,
"property name is not set for '%s'" % arg[0],
)
)
break
path = [
_.replace("\\/", "/")
for _ in re.split(r"(?<!\\)/", arg[0])
if _ != ""
]
entry = kp.find_entries_by_path(path, first=True)
if entry is None:
conn.send(
_resp("fetch", 1, "path '%s' is not found" % path)
)
break
prop = arg[1]
if prop == "custom_properties":
if arg_len == 2:
conn.send(
_resp(
"fetch",
1,
"no custom_property key for '%s'" % arg[0],
)
)
break
prop_key = arg[2]
if prop_key not in entry.custom_properties:
conn.send(
_resp(
"fetch",
1,
"custom_property '%s' is not found for '%s'"
"" % (prop_key, path),
)
)
break
conn.send(
_resp(
"fetch",
0,
entry.get_custom_property(prop_key),
)
)
break
if not hasattr(entry, prop):
conn.send(
_resp(
"fetch",
1,
"unknown property '%s' for '%s'" % (prop, path),
)
)
break
conn.send(_resp("fetch", 0, getattr(entry, prop)))
except CredentialsError:
print("%s failed to decrypt" % kdbx)
sys.exit(1)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
except ValueError as e:
print(str(e))
sys.exit(1)
except socket.timeout:
pass
except KeyboardInterrupt:
pass
finally:
if os.path.exists(sock_path):
os.remove(sock_path)
lock_file_ = sock_path + ".lock"
if os.path.isfile(lock_file_):
os.remove(lock_file_)
def _rq(cmd, *arg):
"""Request to keepass socket
:param str cmd: Command name
:param arg: Arguments
"""
return "\n".join((cmd, *arg)).encode()
def _resp(cmd, status_code, payload=""):
"""Response from keepass socket
:param str cmd: Command name
:param int status_code: == 0 - no error; 1 - an error
:param payload: A data from keepass or error description
"""
return "\n".join((cmd, str(status_code), str(payload))).encode()
def _keepass_socket_path(dbx_path):
# UNIX socket path for a dbx (supported multiple dbx)
tempdir = tempfile.gettempdir()
if not os.access(tempdir, os.W_OK):
raise AnsibleError("KeePass: no write permissions to '%s'" % tempdir)
suffix = hashlib.sha1(("%s%s" % (getpass.getuser(), dbx_path)).encode()).hexdigest()
return "%s/ansible-keepass-%s.sock" % (tempdir, suffix[:8])
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("kdbx", type=str)
arg_parser.add_argument("kdbx_sock", type=str, nargs="?", default=None)
arg_parser.add_argument("ttl", type=int, nargs="?", default=0)
arg_parser.add_argument("--key", type=str, nargs="?", default=None)
arg_parser.add_argument("--ask-pass", action="store_true")
args = arg_parser.parse_args()
kdbx = os.path.realpath(os.path.expanduser(os.path.expandvars(args.kdbx)))
if args.key:
key = os.path.realpath(os.path.expanduser(os.path.expandvars(args.key)))
else:
key = None
if args.kdbx_sock:
kdbx_sock = args.kdbx_sock
else:
kdbx_sock = _keepass_socket_path(kdbx)
password = None
if args.ask_pass:
password = getpass.getpass("Password: ")
if isinstance(password, bytes):
password = password.decode(sys.stdin.encoding)
lock_file = kdbx_sock + ".lock"
if not os.path.isfile(lock_file):
open(lock_file, "a").close()
_keepass_socket(kdbx, key, kdbx_sock, args.ttl, password)