Merge pull request #1335 from darac/gpsd

Add gpsd_ plugin
This commit is contained in:
Kenyon Ralph 2023-01-13 21:13:00 -08:00 committed by GitHub
commit 0468886a97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 362 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

362
plugins/gpsd/gpsd_ Executable file
View File

@ -0,0 +1,362 @@
#!/usr/bin/env python3
"""
=head1 NAME
gpsd_ - Munin plugin to show GPSd quality
=head1 CONFIGURATION
Requires Python3 and the C<gpsdclient> library.
You should specify the host which gpsd is running on by
creating a symlink to this file, e.g.:
ln -s /path/to/gps_ /etc/munin/plugins/gpsd_localhost
=head1 ENVIRONMENT VARIABLES
env.gpsd_collect_time Maximum time to collect data for (in seconds, default 5)
=head1 AUTHOR
Paul Saunders <darac+munin@darac.org.uk>
=head1 MAGIC MARKERS
#%# family=contrib
#%# capabilities=multigraph
=cut
"""
import os
import sys
import colorsys
import datetime
# Helper functions
def debug(msg):
if os.environ.get("MUNIN_DEBUG", "0") == "1":
print(f"# {msg}", file=sys.stderr)
def rgb_to_hex(rgb):
return "%02X%02X%02X" % tuple(map(int, rgb))
def hex_to_rgb(hexa):
return tuple(int(hexa[i : i + 2], 16) for i in (0, 2, 4))
def desaturate(hexa):
debug(f"Desaturating {hexa} ({hex_to_rgb(hexa)})")
(h, _, v) = colorsys.rgb_to_hsv(*hex_to_rgb(hexa))
debug(f"Got ({h}, _, {v})")
return rgb_to_hex(colorsys.hsv_to_rgb(h, 1 / 3, v))
def need_multigraph():
print(f"graph_title {sys.argv[0]} needs MULTIGRAPH")
print(
"graph_info This plugin requires multigraph capabilities,"
" but this node does not support it"
)
print("graph_total total")
sys.exit(1)
# The main functionlity
def config():
"""
Note that we don't graph all values that GPSd could give us.
For example, graphing position could be a security issue
(revealing the physical location of an internet host). However,
graphing things like the _precision_ of position and the number
of visible satellites (without listing which satellites) _should_
give a reasonable balance between security and usability.
"""
##################
# GPS Satellites #
##################
print("multigraph gps_satellites")
print("graph_title GPS Satellites")
print("graph_vlabel # in view")
print("graph_category gnss")
print("graph_printf %4.0lf") # Integers only
graph_order = []
for constellation in sorted(
GPS_CONSTELLATIONS, key=lambda item: item.get("gnssid")
):
fieldname = constellation.get("name").lower()
# Start with the used sats
graph_order.append(f"{fieldname}_used")
print(f"{fieldname}_used.label {constellation['name']:7} (Used)")
print(f"{fieldname}_used.draw AREASTACK")
print(f"{fieldname}_used.type GAUGE")
print(
f"{fieldname}_used.info The number of {constellation['name']}"
f" used by the solution"
)
print(f"{fieldname}_used.colour {constellation['colour']}")
# And now the unused sats
graph_order.append(f"{fieldname}")
print(f"{fieldname}.label {constellation['name']:7}")
print(f"{fieldname}.draw AREASTACK")
print(f"{fieldname}.type GAUGE")
print(f"{fieldname}.info {constellation['info']}")
print(f"{fieldname}.colour {desaturate(constellation['colour'])}")
# Finally, create a summation line
graph_order.append("total")
print("total.label Total")
print("total.info Total number of satellites reported by GPSd")
print("total.type GAUGE")
print("total.draw LINE")
print("total.colour 000000")
#################
# GPS Precision #
#################
print("multigraph gps_precision")
print("graph_title GPS Precision")
print("graph_vlabel DOP")
print("graph_category gnss")
print("graph_args -l 0")
print(
"graph_info <ul><li>&lt;=1: Ideal</li>"
"<li>1-2: Excellent</li>"
"<li>2-5: Good</li>"
"<li>5-10: Moderate</li>"
"<li>10-20: Fair</li>"
"<li>&gt;20: Poor</li></ul>"
)
for dop in GPS_PRECISIONS.keys():
fieldname = dop.lower()
print(f"{fieldname}.label {GPS_PRECISIONS[dop]}")
print(f"{fieldname}.draw LINE")
print(f"{fieldname}.type GAUGE")
print(f"{fieldname}.info {GPS_PRECISIONS[dop]} Dilution of Precision")
print(f"{fieldname}.warning :10")
print(f"{fieldname}.critical :20")
#######################
# GPS Error Estimates #
#######################
print("multigraph gps_error_estimate")
print("graph_title GPS Error Estimates")
print("graph_vlabel estimated error")
print("graph_category gnss")
for error in GPS_ERRORS:
fieldname = error["key"].lower()
print(f"{fieldname}.label {error['name']}")
print(f"{fieldname}.draw LINE")
print(f"{fieldname}.type GAUGE")
print(f"{fieldname}.info {error['info']}")
###################
# GPS Time Offset #
###################
print("multigraph gps_time_offset")
print("graph_title GPS Time Offset")
print("graph_vlabel seconds")
print("graph_category gnss")
print("toff.label Time Offset")
print("toff.draw LINE")
print("toff.type GAUGE")
print("toff.info Time offset between GPS and System clocks")
sys.exit(0)
# For the colours, USED variants are the standard Munin colours,
# UNUSED colours are dimmed by dropping the saturation to 33%.
GPS_CONSTELLATIONS = [
{
"gnssid": 0,
"name": "GPS",
"info": "The original US system. Also called NavStar",
"colour": "00CC00",
},
{
"gnssid": 1,
"name": "SBAS",
"info": "Space Based Augmentation system",
"colour": "0066B3",
},
{
"gnssid": 2,
"name": "Galileo",
"info": "The European Galileo system",
"colour": "FF8000",
},
{
"gnssid": 3,
"name": "BeiDou",
"info": "The Chinese BeiDou system",
"colour": "FFCC00",
},
{
"gnssid": 5,
"name": "QZSS",
"info": (
"The Japanese Quasi-Zenith Satellite System."
" Only visible around Japan and Australia"
),
"colour": "330099",
},
{
"gnssid": 6,
"name": "GLONASS",
"info": "The Russian GLObal Navigation System",
"colour": "990099",
},
]
GPS_PRECISIONS = {
"hdop": "Horizontal",
"vdop": "Vertical",
"pdop": "Position (3D)",
"tdop": "Time",
"gdop": "Geometric",
}
GPS_ERRORS = [
{
"key": "epc",
"name": "Climb",
"info": "Estimated climb error in meters per second. Certainty unknown",
},
{
"key": "epd",
"name": "Direction",
"info": "Estimated track (direction) error in degrees. Certainty unknown",
},
{
"key": "eph",
"name": "Position (2D)",
"info": "Estimated horizontal position (2D) error in meters. Also known as Estimated Position Error (epe). Certainty unknown",
},
{
"key": "eps",
"name": "Speed",
"info": "Estimated speed error in metres per second. Certainty unknown",
},
{
"key": "ept",
"name": "Time",
"info": "Estimated time stamp error in seconds. Certainty unknown",
},
{
"key": "epx",
"name": "Longitude",
"info": "Longitude error estimate in meters. Certainty unknown",
},
{
"key": "epy",
"name": "Latitude",
"info": "Latitude error estimate in meters. Certainty unknown",
},
{
"key": "epv",
"name": "Vertical",
"info": "Estimated vertical error in meters. Certainty unknown",
},
]
if __name__ == "__main__":
for k, v in enumerate(sys.argv):
debug(f"{k}: {v}")
GPSD_SERVER = sys.argv[0].split("_", maxsplit=1)[1]
GPS_COLLECT_TIME = os.environ.get("gpsd_collect_time", 5)
if os.environ.get("MUNIN_CAP_MULTIGRAPH", None) is None:
need_multigraph()
if len(sys.argv) >= 2 and sys.argv[1] == "config":
config()
try:
from gpsdclient import GPSDClient
except ImportError:
print("Unable to import gpsdclient. Do you need to 'pip install gpsdclient'?")
client = GPSDClient(host=GPSD_SERVER)
gps_stanza = {}
debug(f"Gathering from {GPSD_SERVER} for up to {GPS_COLLECT_TIME}s...")
start_date = datetime.datetime.now()
for result in client.dict_stream(convert_datetime=True):
debug(f"{result['class']}")
if result["class"] == "SKY" and "SKY" not in gps_stanza.keys():
# First time we've seen a SKY message. Process it
print("multigraph gps_satellites")
for constellation in sorted(
GPS_CONSTELLATIONS, key=lambda item: item.get("gnssid")
):
fieldname = constellation.get("name").lower()
used = len(
[
x
for x in result["satellites"]
if x["gnssid"] == constellation["gnssid"] and x["used"]
]
)
unused = len(
[
x
for x in result["satellites"]
if x["gnssid"] == constellation["gnssid"] and not x["used"]
]
)
print(f"{fieldname}_used.value {used}")
print(f"{fieldname}.value {unused}")
print(f"total.value {len(result['satellites'])}")
print("multigraph gps_precision")
for dop in GPS_PRECISIONS.keys():
fieldname = dop.lower()
value = result[dop] or "U"
print(f"{fieldname}.value {value}")
if result["class"] == "TPV" and "TPV" not in gps_stanza.keys():
print("multigraph gps_error_estimate")
for error in GPS_ERRORS:
fieldname = error["key"].lower()
try:
value = result[error["key"]]
except KeyError:
value = "U"
print(f"{fieldname}.value {value}")
if result["class"] == "PPS" and "PPS" not in gps_stanza.keys():
print("multigraph gps_time_offset")
value = (result["clock_sec"] - result["real_sec"]) + (
(result["clock_nsec"] - result["real_nsec"]) / 1e9
)
print(f"toff.value {value:.10f}")
gps_stanza[result["class"]] = result
# We want SKY, TPV and PPS
if (
"SKY" in gps_stanza.keys()
and "TPV" in gps_stanza.keys()
and "PPS" in gps_stanza.keys()
):
# Found them all, stop looping
break
if (datetime.datetime.now() - start_date).total_seconds() >= GPS_COLLECT_TIME:
print(
f"# Waited more than {GPS_COLLECT_TIME} seconds. This'll have to do: {gps_stanza.keys()}"
)