munin-contrib/plugins/celery/celery_tasks_states

153 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python
"""=cut
=head1 NAME
celery_tasks_states - Munin plugin to monitor the number of Celery tasks in each state.
=head1 REQUIREMENTS
- Python
- celery (http://celeryproject.org/)
- celerymon (http://github.com/ask/celerymon)
Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option
=head1 CONFIGURATION
Default configuration:
[celery_tasks_states]
env.api_url http://localhost:8989
env.workers all
If workers variable is not set or set to "all", task number for all the workers is monitored.
You can optionally set the workers variable to the string of hostnames you want to monitor separated by a comma.
For example:
[celery_tasks]
env.workers localhost,foo.bar.net,bar.foo.net
This would only monitor the number of tasks for the workers with the hostnames "localhost", "foo.bar.net" and "bar.foo.net"
=head1 MAGIC MARKERS
#%# family=manual
#%# capabilities=autoconf
=head1 AUTHOR
Tomaz Muraus (http://github.com/Kami/munin-celery)
=head1 LICENSE
GPLv2
=cut"""
import os
import sys
import urllib
try:
import json
except:
import simplejson as json
API_URL = 'http://localhost:8989'
URL_ENDPOINTS = {
'workers': '/api/worker/',
'worker_tasks': '/api/worker/%s/tasks',
'tasks': '/api/task/',
'task_names': '/api/task/name/',
'task_details': '/api/task/name/%s',
}
TASK_STATES = (
'PENDING',
'RECEIVED',
'STARTED',
'SUCCESS',
'FAILURE',
'REVOKED',
'RETRY'
)
def get_data(what, api_url, *args):
try:
request = urllib.urlopen('%s%s' % (api_url, \
URL_ENDPOINTS[what] % (args)))
response = request.read()
return json.loads(response)
except IOError:
print 'Could not connect to the celerymon webserver'
sys.exit(-1)
def check_web_server_status(api_url):
try:
request = urllib.urlopen(api_url)
response = request.read()
except IOError:
print 'Could not connect to the celerymon webserver'
sys.exit(-1)
def clean_state_name(state_name):
return state_name.lower()
# Config
def print_config(workers = None):
if workers:
print 'graph_title Celery tasks in each state [workers = %s]' % (', ' . join(workers))
else:
print 'graph_title Celery tasks in each state'
print 'graph_args --lower-limit 0'
print 'graph_scale no'
print 'graph_vlabel tasks per ${graph_period}'
print 'graph_category other'
for name in TASK_STATES:
name = clean_state_name(name)
print '%s.label %s' % (name, name)
print '%s.type DERIVE' % (name)
print '%s.min 0' % (name)
print '%s.info number of %s tasks' % (name, name)
# Values
def print_values(workers = None, api_url = None):
data = get_data('tasks', api_url)
counters = dict([(key, 0) for key in TASK_STATES])
for task_name, task_data in data:
state = task_data['state']
hostname = task_data['worker']['hostname']
if workers and hostname not in workers:
continue
counters[state] += 1
for name in TASK_STATES:
name_cleaned = clean_state_name(name)
value = counters[name]
print '%s.value %d' % (name_cleaned, value)
if __name__ == '__main__':
workers = os.environ.get('workers', 'all')
api_url = os.environ.get('api_url', API_URL)
check_web_server_status(api_url)
if workers in [None, '', 'all']:
workers = None
else:
workers = workers.split(',')
if len(sys.argv) > 1:
if sys.argv[1] == 'config':
print_config(workers)
elif sys.argv[1] == 'autoconf':
print 'yes'
else:
print_values(workers, api_url)