summaryrefslogtreecommitdiffstats
path: root/clover/spark/src/main/scala/CloverSlow.scala
diff options
context:
space:
mode:
Diffstat (limited to 'clover/spark/src/main/scala/CloverSlow.scala')
-rw-r--r--clover/spark/src/main/scala/CloverSlow.scala232
1 files changed, 232 insertions, 0 deletions
diff --git a/clover/spark/src/main/scala/CloverSlow.scala b/clover/spark/src/main/scala/CloverSlow.scala
new file mode 100644
index 0000000..1866d72
--- /dev/null
+++ b/clover/spark/src/main/scala/CloverSlow.scala
@@ -0,0 +1,232 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+
+import org.apache.spark.sql.SparkSession
+import com.datastax.spark.connector._
+import org.apache.spark.sql.cassandra._
+
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkConf
+
+import com.redis._
+
+object CloverSlow {
+ def main(args: Array[String]) {
+ val sp = SparkSession.builder.appName("Clover Slow").getOrCreate()
+ sp.stop()
+
+ val CassandraConnect = "cassandra.clover-system"
+ val RedisConnect = "redis.default"
+
+
+ // Enable/disable various analytics
+ val distinct_url_service = false
+ val response_times = true
+
+ // Cassandra, Redis, Spark Context
+ val scch = "spark.cassandra.connection.host"
+ val conf = new SparkConf(true).set(scch, CassandraConnect)
+ val redis = new RedisClient(RedisConnect, 6379)
+ val sc = new SparkContext(conf)
+
+ val spark = SparkSession
+ .builder()
+ .appName("Clover Visibility Stats")
+ .config("spark.cassandra.connection.host", CassandraConnect)
+ .config("spark.cassandra.connection.port", "9042")
+ .getOrCreate()
+
+ val services = redis.smembers("visibility_services")
+
+ spark
+ .read.cassandraFormat("spans", "visibility")
+ .load()
+ .createOrReplaceTempView("curspans")
+
+ if (distinct_url_service) {
+ // Get number of distinct URLs per service (node_id)
+ for (s <- services.get) {
+ val service = s.get
+ val perurl = spark.sql(
+ s"""
+ |SELECT node_id,count(distinct http_url)
+ |as urls,collect_set(http_url) as values
+ |FROM curspans
+ |WHERE node_id LIKE '%$service%'
+ |GROUP BY node_id
+ """.stripMargin)
+ for ((row) <- perurl.collect) {
+ println(row)
+ val node_id = row.get(0)
+ val url_count = row.get(1)
+ val url_distinct = row.getList(2).toString
+ redis.hmset(service, Map("node_id" -> node_id,
+ "url_count" -> url_count,
+ "url_distinct" -> url_distinct))
+ }
+ }
+ }
+
+ for( x <- 1 to 500 ) {
+
+ if (response_times) {
+ try {
+ for (s <- services.get) {
+ val service = s.get.replace('_', '-')
+ val service_rt = spark.sql(
+ s"""
+ |SELECT avg(duration),min(duration),max(duration)
+ |FROM curspans
+ |WHERE node_id LIKE '%$service%'
+ |AND upstream_cluster LIKE '%inbound%'
+ """.stripMargin)
+ if (service_rt.count > 0) {
+ val avg_rt = service_rt.first.getDouble(0) / 1000.0
+ val min_rt = service_rt.first.getInt(1) / 1000.0
+ val max_rt = service_rt.first.getInt(2) / 1000.0
+ redis.hmset(service, Map("avg_rt" -> f"$avg_rt%1.2f",
+ "min_rt" -> f"$min_rt%1.2f",
+ "max_rt" -> f"$max_rt%1.2f"))
+ } else {
+ redis.hmset(service, Map("avg_rt" -> "NA",
+ "min_rt" -> "NA",
+ "max_rt" -> "NA"))
+ }
+ }
+ } catch {
+ case unknown : Throwable => println("RT exception: "
+ + unknown)
+ //unknown.printStackTrace
+ }
+ }
+
+ // Per URL counts all nodes
+ val urlcount = spark.sql(
+ s"""
+ |SELECT http_url,count(http_url) as urls FROM curspans
+ |GROUP BY http_url
+ """.stripMargin)
+ redis.del("span_urls")
+ redis.del("span_urls_z")
+ for ((row) <- urlcount.collect) {
+ redis.sadd("span_urls", row.get(0))
+ redis.zadd("span_urls_z", row.getLong(1).toDouble, row.get(0))
+ }
+
+ // User-Agents all nodes
+ val uacount = spark.sql(
+ s"""
+ |SELECT user_agent,count(user_agent) as ua FROM curspans
+ |GROUP BY user_agent
+ """.stripMargin)
+ redis.del("span_user_agent")
+ redis.del("span_user_agent_z")
+ for ((row) <- uacount.collect) {
+ redis.sadd("span_user_agent", row.get(0))
+ redis.zadd("span_user_agent_z", row.getLong(1).toDouble,
+ row.get(0))
+ }
+
+ // Node ids all nodes
+ val nodecount = spark.sql(
+ s"""
+ |SELECT node_id,count(node_id) as node FROM curspans
+ |GROUP BY node_id
+ """.stripMargin)
+ redis.del("span_node_id")
+ redis.del("span_node_id_z")
+ for ((row) <- nodecount.collect) {
+ redis.sadd("span_node_id", row.get(0))
+ redis.zadd("span_node_id_z", row.getLong(1).toDouble, row.get(0))
+ }
+
+ // Per URL/status codes all nodes
+ val statuscount = spark.sql(
+ s"""
+ |SELECT http_url,status_code,count(status_code) as urls
+ |FROM curspans
+ |GROUP BY http_url,status_code
+ """.stripMargin)
+ redis.del("span_status_codes_z")
+ for ((row) <- statuscount.collect) {
+ val key_url_code = row.get(1) + ", " + row.get(0)
+ redis.zadd("span_status_codes_z", row.getLong(2).toDouble,
+ key_url_code)
+ }
+
+ // Per Service/URL counts
+ val node_url_count = spark.sql(
+ s"""
+ |SELECT node_id,http_url,count(http_url) as urls
+ |FROM curspans
+ |GROUP BY node_id,http_url
+ """.stripMargin)
+ redis.del("span_node_url_z")
+ for ((row) <- node_url_count.collect) {
+ val key_node_url = row.get(0) + ", " + row.get(1)
+ redis.zadd("span_node_url_z", row.getLong(2).toDouble,
+ key_node_url)
+ }
+
+ // Distinct span fields
+ val distinct_keys = List("operation_name", "upstream_cluster",
+ "status_code")
+ for (field <- distinct_keys) {
+ val distinct_span = spark.sql(
+ s"""
+ |SELECT $field FROM curspans
+ |GROUP BY $field
+ """.stripMargin)
+ val dk = "span_" + field
+ redis.del(dk)
+ for ((row) <- distinct_span.collect) {
+ redis.sadd(dk, row.get(0))
+ }
+ }
+
+ // Metrics, per service
+ spark
+ .read.cassandraFormat("metrics", "visibility")
+ .load()
+ .createOrReplaceTempView("curmetrics")
+
+ val metric_prefixes = redis.smembers("metric_prefixes")
+ val metric_suffixes = redis.smembers("metric_suffixes")
+
+ try {
+ for (s <- services.get) {
+ //val service = s.get.replace('_', '-')
+ val service = s.get
+ for (m_prefix <- metric_prefixes.get) {
+ val mp = m_prefix.get
+ for (m_suffix <- metric_suffixes.get) {
+ val ms = m_suffix.get
+ val metric_result = spark.sql(
+ s"""
+ |SELECT m_value FROM curmetrics
+ |WHERE m_name = '$mp$service$ms'
+ |ORDER BY m_time DESC LIMIT 100
+ """.stripMargin)
+ val metric_key = "metrics_" + mp + service + ms
+ redis.del(metric_key)
+ for ((row) <- metric_result.collect) {
+ redis.lpush(metric_key, row.get(0))
+ }
+ }
+ }
+ }
+ } catch {
+ case unknown : Throwable => println("Metrics exception: "
+ + unknown)
+ // unknown.printStackTrace
+ }
+ }
+
+ }
+}