summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/carbon/emitter.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master/storperf/carbon/emitter.py')
-rw-r--r--docker/storperf-master/storperf/carbon/emitter.py75
1 files changed, 61 insertions, 14 deletions
diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py
index e23dc79..05f6c3c 100644
--- a/docker/storperf-master/storperf/carbon/emitter.py
+++ b/docker/storperf-master/storperf/carbon/emitter.py
@@ -11,28 +11,75 @@ import logging
import socket
import time
+from storperf.db.graphite_db import GraphiteDB
+
class CarbonMetricTransmitter():
- carbon_host = '127.0.0.1'
- carbon_port = 2003
+ carbon_servers = [('127.0.0.1', 2003),
+ ('storperf-graphite', 2003)]
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.graphite_db = GraphiteDB()
+ self.commit_markers = {}
- def transmit_metrics(self, metrics):
- if 'timestamp' in metrics:
- metrics.pop('timestamp')
+ def transmit_metrics(self, metrics, commit_marker):
timestamp = str(calendar.timegm(time.gmtime()))
+ self.commit_markers[commit_marker] = int(timestamp)
+
+ carbon_socket = None
+
+ for host, port in self.carbon_servers:
+ try:
+ carbon_socket = socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)
+ carbon_socket.connect((host, port))
+
+ for key, value in metrics.items():
+ try:
+ float(value)
+ message = "%s %s %s\n" \
+ % (key, value, timestamp)
+ self.logger.debug("Metric: " + message.strip())
+ carbon_socket.send(message)
+ except ValueError:
+ self.logger.debug("Ignoring non numeric metric %s %s"
+ % (key, value))
+
+ message = "%s.commit-marker %s %s\n" \
+ % (commit_marker, timestamp, timestamp)
+ carbon_socket.send(message)
+ self.logger.debug("Marker %s" % message.strip())
+ self.logger.info("Sent metrics to %s:%s with timestamp %s"
+ % (host, port, timestamp))
+
+ except Exception, e:
+ self.logger.error("While notifying carbon %s:%s %s"
+ % (host, port, e))
+
+ if carbon_socket is not None:
+ carbon_socket.close()
+
+ def confirm_commit(self, commit_marker):
+ marker_timestamp = self.commit_markers[commit_marker]
+ request = "%s.commit-marker&from=%s" \
+ % (commit_marker, marker_timestamp - 60)
+ marker_data = self.graphite_db.fetch_item(request)
+ self.logger.debug("Marker data %s" % marker_data)
+ fetched_timestamps = self.parse_timestamp(marker_data)
- carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- carbon_socket.connect((self.carbon_host, self.carbon_port))
+ return marker_timestamp in fetched_timestamps
- for key, metric in metrics.items():
- message = key + " " + metric + " " + timestamp
- self.logger.debug("Metric: " + message)
- carbon_socket.send(message + '\n')
+ def parse_timestamp(self, marker_data):
+ timestamps = []
+ if (type(marker_data) is list and
+ len(marker_data) > 0):
+ datapoints = marker_data[0]['datapoints']
+ for datapoint in datapoints:
+ try:
+ timestamps.append(int(datapoint[0]))
+ except Exception:
+ pass
- carbon_socket.close()
- self.logger.info("Sent metrics to carbon with timestamp %s"
- % timestamp)
+ return timestamps