Skip to content

Commit

Permalink
Merge from unstable into master.
Browse files Browse the repository at this point in the history
stable branch spun off master before this merge.
  • Loading branch information
alq666 committed Aug 14, 2012
1 parent 7ccc4f2 commit 4d2a3a7
Show file tree
Hide file tree
Showing 14 changed files with 636 additions and 36 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ script: nosetests tests
before_script:
- sudo apt-get install sysstat
- curl -L https://github.com/downloads/elasticsearch/elasticsearch/elasticsearch-0.19.8.deb > /tmp/es.deb
- sudo apt-get install haproxy
- curl -L https://raw.github.com/DataDog/dd-agent/check-haproxy/tests/haproxy.cfg > /tmp/haproxy.cfg
- curl -L https://github.com/downloads/elasticsearch/elasticsearch/elasticsearch-0.19.4.deb > /tmp/es.deb
- sudo dpkg -i /tmp/es.deb
- sudo service elasticsearch start
env:
Expand Down
38 changes: 24 additions & 14 deletions checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ def get_metric_names(self):
"Get all metric names"
return self._sample_store.keys()

def save_gauge(self, metric, value, timestamp=None, tags=None):
def save_gauge(self, metric, value, timestamp=None, tags=None, hostname=None):
""" Save a gauge value. """
if not self.is_gauge(metric):
self.gauge(metric)
self.save_sample(metric, value, timestamp, tags)
self.save_sample(metric, value, timestamp, tags, hostname)

def save_sample(self, metric, value, timestamp=None, tags=None):
def save_sample(self, metric, value, timestamp=None, tags=None, hostname=None):
"""Save a simple sample, evict old values if needed
"""
if timestamp is None:
Expand All @@ -153,12 +153,12 @@ def save_sample(self, metric, value, timestamp=None, tags=None):

# Data eviction rules
if self.is_gauge(metric):
self._sample_store[metric][tags] = ((timestamp, value), )
self._sample_store[metric][tags] = ((timestamp, value, hostname), )
elif self.is_counter(metric):
if self._sample_store[metric].get(tags) is None:
self._sample_store[metric][tags] = [(timestamp, value)]
self._sample_store[metric][tags] = [(timestamp, value, hostname)]
else:
self._sample_store[metric][tags] = self._sample_store[metric][tags][-1:] + [(timestamp, value)]
self._sample_store[metric][tags] = self._sample_store[metric][tags][-1:] + [(timestamp, value, hostname)]
else:
raise CheckException("%s must be either gauge or counter, skipping sample at %s" % (metric, time.ctime(timestamp)))

Expand All @@ -180,7 +180,10 @@ def _rate(cls, sample1, sample2):
if delta < 0:
raise UnknownValue()

return (sample2[0], delta / interval)
if len(sample2)==3:
return (sample2[0], delta / interval, sample2[2])
else:
return (sample2[0], delta / interval, None)
except Infinity:
raise
except UnknownValue:
Expand Down Expand Up @@ -216,7 +219,7 @@ def get_sample_with_timestamp(self, metric, tags=None):
def get_sample(self, metric, tags=None):
"Return the last value for that metric"
x = self.get_sample_with_timestamp(metric, tags)
assert type(x) == types.TupleType and len(x) == 2, x
assert type(x) == types.TupleType and len(x) == 3, x
return x[1]

def get_samples_with_timestamps(self):
Expand Down Expand Up @@ -251,11 +254,18 @@ def get_metrics(self):
for m in self._sample_store:
try:
for t in self._sample_store[m]:
ts, val = self.get_sample_with_timestamp(m, t)
if t is None:
metrics.append((m, int(ts), val, {}))
else:
metrics.append((m, int(ts), val, {"tags": list(t)}))
ts, val, hostname = self.get_sample_with_timestamp(m, t)
if not hostname:
if t is None:
metrics.append((m, int(ts), val, {}))
else:
metrics.append((m, int(ts), val, {"tags": list(t)}))
else:
if t is None:
metrics.append((m, int(ts), val, {"host_name":hostname}))
else:
metrics.append((m, int(ts), val, {"tags": list(t), "host_name":hostname}))

except:
pass
return metrics
Expand All @@ -267,4 +277,4 @@ def gethostname(agentConfig):
try:
return socket.gethostname()
except socket.error, e:
logging.debug("processes: unable to get hostanme: " + str(e))
logging.debug("processes: unable to get hostname: " + str(e))
7 changes: 5 additions & 2 deletions checks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
from checks.cacti import Cacti
from checks.varnish import Varnish

from checks.db.elastic import ElasticSearch
from checks.db.elastic import ElasticSearch, ElasticSearchClusterStatus
from checks.net.haproxy import HAProxyMetrics, HAProxyEvents


from checks.ec2 import EC2

Expand Down Expand Up @@ -101,6 +103,7 @@ def __init__(self, agentConfig, emitters):
Redis(self.checksLogger),
Varnish(self.checksLogger),
ElasticSearch(self.checksLogger),
HAProxyMetrics(self.checksLogger)
]

for module_spec in [s.strip() for s in self.agentConfig.get('custom_checks', '').split(',')]:
Expand All @@ -111,7 +114,7 @@ def __init__(self, agentConfig, emitters):
except Exception, e:
self.checksLogger.exception('Unable to load custom check module %s' % module_spec)

self._event_checks = [Hudson(), Nagios(socket.gethostname())]
self._event_checks = [ElasticSearchClusterStatus(self.checksLogger), HAProxyEvents(self.checksLogger), Hudson(), Nagios(socket.gethostname())]
self._resources_checks = [ResProcesses(self.checksLogger,self.agentConfig)]

self._ec2 = EC2(self.checksLogger)
Expand Down
118 changes: 104 additions & 14 deletions checks/db/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,89 @@
import socket
import subprocess
import sys
from datetime import datetime
import time

from checks import Check, gethostname
from util import json, headers

HEALTH_URL = "/_cluster/health?pretty=true"
STATS_URL = "/_cluster/nodes/stats?all=true"
NODES_URL = "/_cluster/nodes?network=true"


def _get_data(agentConfig, url):
"Hit a given URL and return the parsed json"
req = urllib2.Request(url, None, headers(agentConfig))
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)

class NodeNotFound(Exception): pass

class ElasticSearchClusterStatus(Check):
def __init__(self, logger):
Check.__init__(self, logger)
self.cluster_status = None

def check(self, logger, config, data=None):
config_url = config.get("elasticsearch", None)

# Check if we are configured properly
if config_url is None:
return False

url = urlparse.urljoin(config_url, HEALTH_URL)
self.logger.info("Fetching elasticsearch data from: %s" % url)

try:
if not data:
data = _get_data(config, url)
if not self.cluster_status:
self.cluster_status = data['status']
if data['status'] in ["yellow", "red"]:
event = self._create_event(config)
return [event]
return []
if data['status'] != self.cluster_status:
self.cluster_status = data['status']
event = self._create_event(config)
return [event]
return []

except:
self.logger.exception('Unable to get elasticsearch statistics')
return False



def _create_event(self, agentConfig):
hostname = gethostname(agentConfig).decode('utf-8')
if self.cluster_status == "red" or self.cluster_status=="yellow":
alert_type = "error"
msg_title = "%s is %s" % (hostname, self.cluster_status)
else:
# then it should be green
alert_type == "info"
msg_title = "%s recovered as %s" % (hostname, self.cluster_status)

msg = "%s just reported as %s" % (hostname, self.cluster_status)

return { 'timestamp': int(time.mktime(datetime.utcnow().timetuple())),
'event_type': 'elasticsearch',
'host': hostname,
'api_key': agentConfig['apiKey'],
'msg_text':msg,
'msg_title': msg_title,
"alert_type": alert_type,
"source_type": "Elasticsearch",
"event_object": hostname
}


class ElasticSearch(Check):

STATS_URL = "/_cluster/nodes/stats?all=true"
NODES_URL = "/_cluster/nodes?network=true"


METRICS = {
"elasticsearch.docs.count": ("gauge", "indices.docs.count"),
Expand Down Expand Up @@ -111,6 +184,13 @@ class ElasticSearch(Check):
"jvm.mem.non_heap_used": ("gauge", "jvm.mem.non_heap_used_in_bytes"),
"jvm.threads.count": ("gauge", "jvm.threads.count"),
"jvm.threads.peak_count": ("gauge", "jvm.threads.peak_count"),
"elasticsearch.number_of_nodes": ("gauge", "number_of_nodes"),
"elasticsearch.number_of_data_nodes": ("gauge", "number_of_data_nodes"),
"elasticsearch.active_primary_shards": ("gauge", "active_primary_shards"),
"elasticsearch.active_shards": ("gauge", "active_shards"),
"elasticsearch.relocating_shards": ("gauge", "relocating_shards"),
"elasticsearch.initializing_shards": ("gauge", "initializing_shards"),
"elasticsearch.unassigned_shards": ("gauge", "unassigned_shards"),
}

@classmethod
Expand All @@ -134,12 +214,6 @@ def generate_metric(name, xtype, *args):

self._map_metric(generate_metric)

def _get_data(self, agentConfig, url):
"Hit a given URL and return the parsed json"
req = urllib2.Request(url, None, headers(agentConfig))
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)

def _metric_not_found(self, metric, path):
self.logger.warning("Metric not found: %s -> %s", path, metric)
Expand Down Expand Up @@ -187,14 +261,21 @@ def process_metric(metric, xtype, path, xform=None):
# against the primary IP from ES
try:
base_url = self._base_es_url(agentConfig['elasticsearch'])
url = "%s%s" % (base_url, self.NODES_URL)
url = "%s%s" % (base_url, NODES_URL)
primary_addr = self._get_primary_addr(agentConfig, url, node)
except NodeNotFound:
# Skip any nodes that aren't found
continue
if self._host_matches_node(primary_addr):
self._map_metric(process_metric)

def _process_health_data(self, agentConfig, data):
def process_metric(metric, xtype, path, xform=None):
# closure over node_data
self._process_metric(data, metric, path, xform)
self._map_metric(process_metric)


def _get_primary_addr(self, agentConfig, url, node_name):
''' Returns a list of primary interface addresses as seen by ES.
Used in ES < 0.19
Expand Down Expand Up @@ -244,7 +325,7 @@ def _base_es_url(self, config_url):
return config_url
return "%s://%s" % (parsed.scheme, parsed.netloc)

def check(self, config):
def check(self, config, url_suffix=STATS_URL):
"""Extract data from stats URL
http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-stats.html
"""
Expand All @@ -259,15 +340,22 @@ def check(self, config):
# If only the hostname was passed, accept that and add our stats_url
# Else use the full URL as provided
if urlparse.urlparse(config_url).path == "":
url = urlparse.urljoin(config_url, self.STATS_URL)
url = urlparse.urljoin(config_url, url_suffix)
else:
url = config_url

self.logger.info("Fetching elasticsearch data from: %s" % url)

try:
data = self._get_data(config, url)
self._process_data(config, data)
data = _get_data(config, url)

if url_suffix==STATS_URL:
self._process_data(config, data)
self.check(config, HEALTH_URL)

else:
self._process_health_data(config, data)

return self.get_metrics()
except:
self.logger.exception('Unable to get elasticsearch statistics')
Expand All @@ -281,4 +369,6 @@ def check(self, config):
logging.basicConfig()
logger = logging.getLogger()
c = ElasticSearch(logger)
pprint.pprint(c.check({"elasticsearch": "http://localhost:9200", "version": get_version()}))
config = {"elasticsearch": "http://localhost:9200", "version": get_version(), "apiKey":"apiKey 2"}
pprint.pprint(c.check(config))

Empty file added checks/net/__init__.py
Empty file.
Loading

0 comments on commit 4d2a3a7

Please # to comment.