Add gpsd_ plugin

This is a multigraph plugin which graphs various stats from gpsd. Only
enough statistics are graphed to show the _quality_ of the fix, not the
actual fix itself (attempting to protect privacy).

GPSd will report various messages asynchronously (that is, the messages
may come in any order) so, during the collect phase, we emit values as
and when the messages arrive. This should provide the most expedient
response.
This commit is contained in:
darac 2022-10-06 21:45:20 +01:00 committed by Kenyon Ralph
parent c2c540b737
commit 00a3d064ae
No known key found for this signature in database
GPG Key ID: 98FF3EF9C9B912D5
5 changed files with 362 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

362
plugins/gpsd/gpsd_ Executable file
View File

@ -0,0 +1,362 @@
#!/usr/bin/env python3
"""
=head1 NAME
gpsd_ - Munin plugin to show GPSd quality
=head1 CONFIGURATION
Requires Python3 and the C<gpsdclient> library.
You should specify the host which gpsd is running on by
creating a symlink to this file, e.g.:
ln -s /path/to/gps_ /etc/munin/plugins/gpsd_localhost
=head1 ENVIRONMENT VARIABLES
env.gpsd_collect_time Maximum time to collect data for (in seconds, default 5)
=head1 AUTHOR
Paul Saunders <darac+munin@darac.org.uk>
=head1 MAGIC MARKERS
#%# family=contrib
#%# capabilities=multigraph
=cut
"""
import os
import sys
import colorsys
import datetime
# Helper functions
def debug(msg):
if os.environ.get("MUNIN_DEBUG", "0") == "1":
print(f"# {msg}", file=sys.stderr)
def rgb_to_hex(rgb):
return "%02X%02X%02X" % tuple(map(int, rgb))
def hex_to_rgb(hexa):
return tuple(int(hexa[i : i + 2], 16) for i in (0, 2, 4))
def desaturate(hexa):
debug(f"Desaturating {hexa} ({hex_to_rgb(hexa)})")
(h, _, v) = colorsys.rgb_to_hsv(*hex_to_rgb(hexa))
debug(f"Got ({h}, _, {v})")
return rgb_to_hex(colorsys.hsv_to_rgb(h, 1 / 3, v))
def need_multigraph():
print(f"graph_title {sys.argv[0]} needs MULTIGRAPH")
print(
"graph_info This plugin requires multigraph capabilities,"
" but this node does not support it"
)
print("graph_total total")
sys.exit(1)
# The main functionlity
def config():
"""
Note that we don't graph all values that GPSd could give us.
For example, graphing position could be a security issue
(revealing the physical location of an internet host). However,
graphing things like the _precision_ of position and the number
of visible satellites (without listing which satellites) _should_
give a reasonable balance between security and usability.
"""
##################
# GPS Satellites #
##################
print("multigraph gps_satellites")
print("graph_title GPS Satellites")
print("graph_vlabel # in view")
print("graph_category gnss")
print("graph_printf %4.0lf") # Integers only
graph_order = []
for constellation in sorted(
GPS_CONSTELLATIONS, key=lambda item: item.get("gnssid")
):
fieldname = constellation.get("name").lower()
# Start with the used sats
graph_order.append(f"{fieldname}_used")
print(f"{fieldname}_used.label {constellation['name']:7} (Used)")
print(f"{fieldname}_used.draw AREASTACK")
print(f"{fieldname}_used.type GAUGE")
print(
f"{fieldname}_used.info The number of {constellation['name']}"
f" used by the solution"
)
print(f"{fieldname}_used.colour {constellation['colour']}")
# And now the unused sats
graph_order.append(f"{fieldname}")
print(f"{fieldname}.label {constellation['name']:7}")
print(f"{fieldname}.draw AREASTACK")
print(f"{fieldname}.type GAUGE")
print(f"{fieldname}.info {constellation['info']}")
print(f"{fieldname}.colour {desaturate(constellation['colour'])}")
# Finally, create a summation line
graph_order.append("total")
print("total.label Total")
print("total.info Total number of satellites reported by GPSd")
print("total.type GAUGE")
print("total.draw LINE")
print("total.colour 000000")
#################
# GPS Precision #
#################
print("multigraph gps_precision")
print("graph_title GPS Precision")
print("graph_vlabel DOP")
print("graph_category gnss")
print("graph_args -l 0")
print(
"graph_info <ul><li>&lt;=1: Ideal</li>"
"<li>1-2: Excellent</li>"
"<li>2-5: Good</li>"
"<li>5-10: Moderate</li>"
"<li>10-20: Fair</li>"
"<li>&gt;20: Poor</li></ul>"
)
for dop in GPS_PRECISIONS.keys():
fieldname = dop.lower()
print(f"{fieldname}.label {GPS_PRECISIONS[dop]}")
print(f"{fieldname}.draw LINE")
print(f"{fieldname}.type GAUGE")
print(f"{fieldname}.info {GPS_PRECISIONS[dop]} Dilution of Precision")
print(f"{fieldname}.warning :10")
print(f"{fieldname}.critical :20")
#######################
# GPS Error Estimates #
#######################
print("multigraph gps_error_estimate")
print("graph_title GPS Error Estimates")
print("graph_vlabel estimated error")
print("graph_category gnss")
for error in GPS_ERRORS:
fieldname = error["key"].lower()
print(f"{fieldname}.label {error['name']}")
print(f"{fieldname}.draw LINE")
print(f"{fieldname}.type GAUGE")
print(f"{fieldname}.info {error['info']}")
###################
# GPS Time Offset #
###################
print("multigraph gps_time_offset")
print("graph_title GPS Time Offset")
print("graph_vlabel seconds")
print("graph_category gnss")
print("toff.label Time Offset")
print("toff.draw LINE")
print("toff.type GAUGE")
print("toff.info Time offset between GPS and System clocks")
sys.exit(0)
# For the colours, USED variants are the standard Munin colours,
# UNUSED colours are dimmed by dropping the saturation to 33%.
GPS_CONSTELLATIONS = [
{
"gnssid": 0,
"name": "GPS",
"info": "The original US system. Also called NavStar",
"colour": "00CC00",
},
{
"gnssid": 1,
"name": "SBAS",
"info": "Space Based Augmentation system",
"colour": "0066B3",
},
{
"gnssid": 2,
"name": "Galileo",
"info": "The European Galileo system",
"colour": "FF8000",
},
{
"gnssid": 3,
"name": "BeiDou",
"info": "The Chinese BeiDou system",
"colour": "FFCC00",
},
{
"gnssid": 5,
"name": "QZSS",
"info": (
"The Japanese Quasi-Zenith Satellite System."
" Only visible around Japan and Australia"
),
"colour": "330099",
},
{
"gnssid": 6,
"name": "GLONASS",
"info": "The Russian GLObal Navigation System",
"colour": "990099",
},
]
GPS_PRECISIONS = {
"hdop": "Horizontal",
"vdop": "Vertical",
"pdop": "Position (3D)",
"tdop": "Time",
"gdop": "Geometric",
}
GPS_ERRORS = [
{
"key": "epc",
"name": "Climb",
"info": "Estimated climb error in meters per second. Certainty unknown",
},
{
"key": "epd",
"name": "Direction",
"info": "Estimated track (direction) error in degrees. Certainty unknown",
},
{
"key": "eph",
"name": "Position (2D)",
"info": "Estimated horizontal position (2D) error in meters. Also known as Estimated Position Error (epe). Certainty unknown",
},
{
"key": "eps",
"name": "Speed",
"info": "Estimated speed error in metres per second. Certainty unknown",
},
{
"key": "ept",
"name": "Time",
"info": "Estimated time stamp error in seconds. Certainty unknown",
},
{
"key": "epx",
"name": "Longitude",
"info": "Longitude error estimate in meters. Certainty unknown",
},
{
"key": "epy",
"name": "Latitude",
"info": "Latitude error estimate in meters. Certainty unknown",
},
{
"key": "epv",
"name": "Vertical",
"info": "Estimated vertical error in meters. Certainty unknown",
},
]
if __name__ == "__main__":
for k, v in enumerate(sys.argv):
debug(f"{k}: {v}")
GPSD_SERVER = sys.argv[0].split("_", maxsplit=1)[1]
GPS_COLLECT_TIME = os.environ.get("gpsd_collect_time", 5)
if os.environ.get("MUNIN_CAP_MULTIGRAPH", None) is None:
need_multigraph()
if len(sys.argv) >= 2 and sys.argv[1] == "config":
config()
try:
from gpsdclient import GPSDClient
except ImportError:
print("Unable to import gpsdclient. Do you need to 'pip install gpsdclient'?")
client = GPSDClient(host=GPSD_SERVER)
gps_stanza = {}
debug(f"Gathering from {GPSD_SERVER} for up to {GPS_COLLECT_TIME}s...")
start_date = datetime.datetime.now()
for result in client.dict_stream(convert_datetime=True):
debug(f"{result['class']}")
if result["class"] == "SKY" and "SKY" not in gps_stanza.keys():
# First time we've seen a SKY message. Process it
print("multigraph gps_satellites")
for constellation in sorted(
GPS_CONSTELLATIONS, key=lambda item: item.get("gnssid")
):
fieldname = constellation.get("name").lower()
used = len(
[
x
for x in result["satellites"]
if x["gnssid"] == constellation["gnssid"] and x["used"]
]
)
unused = len(
[
x
for x in result["satellites"]
if x["gnssid"] == constellation["gnssid"] and not x["used"]
]
)
print(f"{fieldname}_used.value {used}")
print(f"{fieldname}.value {unused}")
print(f"total.value {len(result['satellites'])}")
print("multigraph gps_precision")
for dop in GPS_PRECISIONS.keys():
fieldname = dop.lower()
value = result[dop] or "U"
print(f"{fieldname}.value {value}")
if result["class"] == "TPV" and "TPV" not in gps_stanza.keys():
print("multigraph gps_error_estimate")
for error in GPS_ERRORS:
fieldname = error["key"].lower()
try:
value = result[error["key"]]
except KeyError:
value = "U"
print(f"{fieldname}.value {value}")
if result["class"] == "PPS" and "PPS" not in gps_stanza.keys():
print("multigraph gps_time_offset")
value = (result["clock_sec"] - result["real_sec"]) + (
(result["clock_nsec"] - result["real_nsec"]) / 1e9
)
print(f"toff.value {value:.10f}")
gps_stanza[result["class"]] = result
# We want SKY, TPV and PPS
if (
"SKY" in gps_stanza.keys()
and "TPV" in gps_stanza.keys()
and "PPS" in gps_stanza.keys()
):
# Found them all, stop looping
break
if (datetime.datetime.now() - start_date).total_seconds() >= GPS_COLLECT_TIME:
print(
f"# Waited more than {GPS_COLLECT_TIME} seconds. This'll have to do: {gps_stanza.keys()}"
)