munin-contrib/plugins/sensors/switchbotmeterbt

210 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
=head1 NAME
switchbotmeterbt - Munin plugin to monitor temperature/humidity with SwitchBot Meter BLE
=head1 CONFIGURATION
Python 3 and bluepy are necessary.
Example for Ubuntu:
$ sudo apt install python3-pip
$ sudo pip3 install bluepy
=head1 ENVIRONMENT VARIABLES
user : root privilege is necessary for BLE scan
env.macaddr : Mac Address(es) of SwitchBot Meter(s) separated by white-space (Required)
env.hcidev : HCI device index. (Optional, default is 0)
env.tempunit : Temperature unit. (Optional, default is C)
env.scantimeout : Timeout for BLE scan. (Optional, default is 5.0 seconds)
Example:
[switchbotmeterbt]
user root
env.macaddr aa:aa:aa:aa:aa:aa bb:bb:bb:bb:bb:bb cc:cc:cc:cc:cc:cc
=head1 NOTES
For more details about SwitchBot Meter, see https://www.switch-bot.com/products/switchbot-meter
=head1 AUTHOR
K.Cima https://github.com/shakemid
=head1 LICENSE
GPLv2
SPDX-License-Identifier: GPL-2.0-only
=head1 Magic markers
#%# family=contrib
#%# capabilities=
=cut
"""
import sys
import os
import subprocess
from bluepy.btle import Scanner, DefaultDelegate
class SwitchbotScanDelegate(DefaultDelegate):
def __init__(self, macaddrs):
super().__init__()
self.sensorValues = {}
self.macaddrs = macaddrs
# Called when advertising data is received from an LE device
def handleDiscovery(self, dev, isNewDev, isNewData):
if dev.addr in self.macaddrs:
for (adtype, desc, value) in dev.getScanData():
if desc == '16b Service Data':
munin_debug(value)
self._decodeSensorData(dev, value)
def _decodeSensorData(self, dev, value):
# Extract lower 6 octets
valueBinary = bytes.fromhex(value[4:16])
# Refer to Meter BLE open API
# https://github.com/OpenWonderLabs/python-host/wiki/Meter-BLE-open-API
deviceType = chr(valueBinary[0] & 0b01111111)
battery = valueBinary[2] & 0b01111111
tint = valueBinary[4] & 0b01111111
tdec = valueBinary[3] & 0b00001111
temperature = tint + tdec / 10
isTemperatureAboveFreezing = valueBinary[4] & 0b10000000
if not isTemperatureAboveFreezing:
temperature = -temperature
humidity = valueBinary[5] & 0b01111111
self.sensorValues[dev.addr] = {
'DeviceType': deviceType,
'Temperature': temperature,
'Humidity': humidity,
'Battery': battery,
'RSSI': dev.rssi
}
munin_debug(str(self.sensorValues))
class SwitchbotMeterPlugin():
def __init__(self):
self.params = {}
self.params['pluginname'] = os.path.basename(__file__)
self.params['macaddrs'] = [str.lower(macaddr)
for macaddr in os.getenv('macaddr').split()]
self.params['hcidev'] = int(os.getenv('hcidev', 0))
self.params['tempunit'] = os.getenv('tempunit', 'C')
self.params['scantimeout'] = float(os.getenv('scantimeout', 5.0))
self.graphs = {
'temp': {
'name': 'Temperature',
'attrs': [
'graph_title SwitchBot Meter Temperature',
'graph_category sensors',
'graph_vlabel Temp ' + self.params['tempunit'],
'graph_scale no',
'graph_args --base 1000'
]
},
'humid': {
'name': 'Humidity',
'attrs': [
'graph_title SwitchBot Meter Humidity',
'graph_category sensors',
'graph_vlabel Humid %',
'graph_scale no',
'graph_args --base 1000'
]
},
'batt': {
'name': 'Battery',
'attrs': [
'graph_title SwitchBot Meter Battery',
'graph_category sensors',
'graph_vlabel %',
'graph_scale no',
'graph_args --base 1000 --lower-limit 0 --upper-limit 100'
]
},
'rssi': {
'name': 'RSSI',
'attrs': [
'graph_title SwitchBot Meter RSSI',
'graph_category sensors',
'graph_vlabel dB',
'graph_scale no',
'graph_args --base 1000'
]
}
}
def config(self):
# print config
for k in self.graphs.keys():
print('multigraph ' + self.params['pluginname'] + '_' + k)
for line in self.graphs[k]['attrs']:
print(line)
for macaddr in self.params['macaddrs']:
field = macaddr.replace(':', '')
print(field + '.label ' + macaddr)
def fetch(self):
# scan to fetch data
scanner = Scanner(self.params['hcidev']).withDelegate(SwitchbotScanDelegate(self.params['macaddrs']))
try:
# sometimes it might fail
scanner.scan(self.params['scantimeout'])
for macaddr in self.params['macaddrs']:
check = scanner.delegate.sensorValues[macaddr]
except KeyError as e:
munin_error('retry scan for exception: ' + str(type(e)))
scanner.scan(self.params['scantimeout'])
except Exception as e:
munin_error('reset hci and retry scan for exception: ' + str(type(e)))
subprocess.call(f'hciconfig hci{self.params["hcidev"]} down && hciconfig hci{self.params["hcidev"]} up', shell=True)
scanner.scan(self.params['scantimeout'])
# print value
for k in self.graphs.keys():
print('multigraph ' + self.params['pluginname'] + '_' + k)
for macaddr in self.params['macaddrs']:
field = macaddr.replace(':', '')
try:
print(field + '.value ' +
str(scanner.delegate.sensorValues[macaddr][self.graphs[k]['name']]))
except KeyError:
pass
def munin_error(message):
print(message, file=sys.stderr)
def munin_debug(message):
if os.getenv('MUNIN_DEBUG') == '1':
print('# ' + message)
def main():
plugin = SwitchbotMeterPlugin()
if len(sys.argv) > 1 and sys.argv[1] == 'config':
plugin.config()
if os.getenv('MUNIN_CAP_DIRTYCONFIG') == '1':
plugin.fetch()
else:
plugin.fetch()
if __name__ == '__main__':
main()