282 lines
8.1 KiB
Python
Executable File
282 lines
8.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
=head1 NAME
|
|
|
|
arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85
|
|
and compatible cable modems
|
|
|
|
=head1 DESCRIPTION
|
|
Connect to the web-frontend and get current DOCSIS status of upstream and
|
|
downstream channels. (Signal Power, SNR, Lock Status)
|
|
|
|
|
|
=head1 REQUIREMENTS
|
|
|
|
=over 4
|
|
|
|
=item BeautifulSoup
|
|
|
|
=item pycryptodome
|
|
|
|
=back
|
|
|
|
|
|
=head1 CONFIGURATION
|
|
|
|
=head2 Example
|
|
|
|
[arris]
|
|
env.url http://192.168.100.1
|
|
env.username admin
|
|
env.password yourpassword
|
|
|
|
|
|
=head2 Parameters
|
|
|
|
url - URL to web-frontend
|
|
username - defaults to "admin"
|
|
password - valid password
|
|
|
|
|
|
=head1 REFERENCES
|
|
https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/
|
|
|
|
|
|
=head1 AUTHOR
|
|
|
|
Copyright (c) 2019 Daniel Hiepler <d-munin@coderdu.de>
|
|
|
|
Copyright (c) 2004-2009 Nicolas Stransky <Nico@stransky.cx>
|
|
|
|
Copyright (c) 2018 Lars Kruse <devel@sumpfralle.de>
|
|
|
|
|
|
=head1 LICENSE
|
|
|
|
Permission to use, copy, and modify this software with or without fee
|
|
is hereby granted, provided that this entire notice is included in
|
|
all source code copies of any software which is or includes a copy or
|
|
modification of this software.
|
|
|
|
THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR
|
|
IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
|
|
REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
|
|
MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
|
|
PURPOSE.
|
|
|
|
|
|
=head1 MAGIC MARKERS
|
|
|
|
#%# family=contrib
|
|
|
|
=cut
|
|
"""
|
|
|
|
import binascii
|
|
from bs4 import BeautifulSoup
|
|
from Crypto.Cipher import AES
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import requests
|
|
import sys
|
|
import os
|
|
|
|
|
|
"""
|
|
The CREDENTIAL_COOKIE below equals the following:
|
|
base64.encodebytes(b'{ "unique":"280oaPSLiF", "family":"852", "modelname":"TG2492LG-85", '
|
|
'"name":"technician", "tech":true, "moca":0, "wifi":5, "conType":"WAN", '
|
|
'"gwWan":"f", "DefPasswdChanged":"YES" }').decode()
|
|
"""
|
|
CREDENTIAL_COOKIE = "eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiVEcy"\
|
|
"NDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOjAsICJ3"\
|
|
"aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2hhbmdlZCI6"\
|
|
"IllFUyIgfQ=="
|
|
|
|
|
|
def login(session, url, username, password):
|
|
"""login to """
|
|
# get login page
|
|
r = session.get(f"{url}")
|
|
# parse HTML
|
|
h = BeautifulSoup(r.text, "lxml")
|
|
# get session id from javascript in head
|
|
current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1]
|
|
|
|
# encrypt password
|
|
salt = os.urandom(8)
|
|
iv = os.urandom(8)
|
|
key = hashlib.pbkdf2_hmac(
|
|
'sha256',
|
|
bytes(password.encode("ascii")),
|
|
salt,
|
|
iterations=1000,
|
|
dklen=128 / 8
|
|
)
|
|
secret = {"Password": password, "Nonce": current_session_id}
|
|
plaintext = bytes(json.dumps(secret).encode("ascii"))
|
|
associated_data = "loginPassword"
|
|
# initialize cipher
|
|
cipher = AES.new(key, AES.MODE_CCM, iv)
|
|
# set associated data
|
|
cipher.update(bytes(associated_data.encode("ascii")))
|
|
# encrypt plaintext
|
|
encrypt_data = cipher.encrypt(plaintext)
|
|
# append digest
|
|
encrypt_data += cipher.digest()
|
|
# return
|
|
login_data = {
|
|
'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"),
|
|
'Name': username,
|
|
'Salt': binascii.hexlify(salt).decode("ascii"),
|
|
'Iv': binascii.hexlify(iv).decode("ascii"),
|
|
'AuthData': associated_data
|
|
}
|
|
|
|
# login
|
|
r = session.put(
|
|
f"{url}/php/ajaxSet_Password.php",
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"csrfNonce": "undefined"
|
|
},
|
|
data=json.dumps(login_data)
|
|
)
|
|
|
|
# parse result
|
|
result = json.loads(r.text)
|
|
# success?
|
|
if result['p_status'] == "Fail":
|
|
print("login failure", file=sys.stderr)
|
|
exit(-1)
|
|
# remember CSRF nonce
|
|
csrf_nonce = result['nonce']
|
|
|
|
# prepare headers
|
|
session.headers.update({
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"csrfNonce": csrf_nonce,
|
|
"Origin": f"{url}/",
|
|
"Referer": f"{url}/"
|
|
})
|
|
# set credentials cookie
|
|
session.cookies.set("credential", CREDENTIAL_COOKIE)
|
|
|
|
# set session
|
|
r = session.post(f"{url}/php/ajaxSet_Session.php")
|
|
|
|
|
|
def docsis_status(session):
|
|
"""get current DOCSIS status page, parse and return channel data"""
|
|
r = session.get(f"{url}/php/status_docsis_data.php")
|
|
# extract json from javascript
|
|
json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1]
|
|
json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1]
|
|
# parse json
|
|
downstream_data = json.loads(json_downstream_data)
|
|
upstream_data = json.loads(json_upstream_data)
|
|
# convert lock status to numeric values
|
|
for d in [upstream_data, downstream_data]:
|
|
for c in d:
|
|
if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked":
|
|
c['LockStatus'] = 1
|
|
else:
|
|
c['LockStatus'] = 0
|
|
return downstream_data, upstream_data
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == "__main__":
|
|
# get config
|
|
url = os.getenv("url")
|
|
username = os.getenv("username")
|
|
password = os.getenv("password")
|
|
# validate config
|
|
if not url or not username or not password:
|
|
print("Set url, username and password first.", file=sys.stderr)
|
|
exit(1)
|
|
# create session
|
|
session = requests.Session()
|
|
# login with username and password
|
|
login(session, url, username, password)
|
|
# get DOCSIS status
|
|
downstream, upstream = docsis_status(session)
|
|
# prepare munin graph info
|
|
graph_descriptions = [
|
|
{
|
|
"name": "up_signal",
|
|
"title": "DOCSIS Upstream signal strength",
|
|
"vlabel": "dBmV",
|
|
"info": "DOCSIS upstream signal strength by channel",
|
|
"data": upstream,
|
|
"key": "PowerLevel"
|
|
},
|
|
{
|
|
"name": "up_lock",
|
|
"title": "DOCSIS Upstream lock",
|
|
"vlabel": "locked",
|
|
"info": "DOCSIS upstream channel lock status",
|
|
"data": upstream,
|
|
"key": "LockStatus"
|
|
},
|
|
{
|
|
"name": "down_signal",
|
|
"title": "DOCSIS Downstream signal strength",
|
|
"vlabel": "dBmV",
|
|
"info": "DOCSIS downstream signal strength by channel",
|
|
"data": downstream,
|
|
"key": "PowerLevel"
|
|
},
|
|
{
|
|
"name": "down_lock",
|
|
"title": "DOCSIS Downstream lock",
|
|
"vlabel": "locked",
|
|
"info": "DOCSIS downstream channel lock status",
|
|
"data": downstream,
|
|
"key": "LockStatus"
|
|
},
|
|
{
|
|
"name": "down_snr",
|
|
"title": "DOCSIS Downstream signal/noise ratio",
|
|
"vlabel": "dB",
|
|
"info": "SNR/MER",
|
|
"data": downstream,
|
|
"key": "SNRLevel"
|
|
}
|
|
]
|
|
|
|
# configure ?
|
|
if len(sys.argv) > 1 and "config" == sys.argv[1]:
|
|
# process all graphs
|
|
for g in graph_descriptions:
|
|
# graph config
|
|
print(f"multigraph docsis_{g['name']}")
|
|
print(f"graph_title {g['title']}")
|
|
print("graph_category network")
|
|
print(f"graph_vlabel {g['vlabel']}")
|
|
print(f"graph_info {g['info']}")
|
|
print("graph_scale no")
|
|
|
|
# channels
|
|
for c in g['data']:
|
|
# only use channels with PowerLevel
|
|
if not c['PowerLevel']:
|
|
continue
|
|
info_text = f"Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}"
|
|
print(f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)")
|
|
print(f"channel_{c['ChannelID']}.info {info_text}")
|
|
|
|
# output values ?
|
|
else:
|
|
# process all graphs
|
|
for g in graph_descriptions:
|
|
print(f"multigraph docsis_{g['name']}")
|
|
# channels
|
|
for c in g['data']:
|
|
# only use channels with PowerLevel
|
|
if not c['PowerLevel']:
|
|
continue
|
|
print(f"channel_{c['ChannelID']}.value {c[g['key']]}")
|