aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java')
-rw-r--r--framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java234
1 files changed, 234 insertions, 0 deletions
diff --git a/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
new file mode 100644
index 00000000..8c160e85
--- /dev/null
+++ b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Collects and distributes performance samples.
+ */
+@Component(immediate = true)
+@Service(value = IntentPerfCollector.class)
+public class IntentPerfCollector {
+
+ private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
+ private final Logger log = getLogger(getClass());
+
+ private static final int MAX_SAMPLES = 1_000;
+
+ private final List<Sample> samples = new LinkedList<>();
+
+ private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService communicationService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected IntentPerfUi ui;
+
+ // Auxiliary structures used to accrue data for normalized time interval
+ // across all nodes.
+ private long newestTime;
+ private Sample overall;
+ private Sample current;
+
+ private ControllerNode[] nodes;
+ private Map<NodeId, Integer> nodeToIndex;
+
+ private NodeId nodeId;
+
+ @Activate
+ public void activate() {
+ nodeId = clusterService.getLocalNode().id();
+
+ communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
+ getPoolThreadExecutor());
+
+ nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
+ Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
+
+ nodeToIndex = new HashMap<>();
+ for (int i = 0; i < nodes.length; i++) {
+ nodeToIndex.put(nodes[i].id(), i);
+ }
+
+ clearSamples();
+ ui.setCollector(this);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ communicationService.removeSubscriber(SAMPLE);
+ log.info("Stopped");
+ }
+
+ /**
+ * Clears all previously accumulated data.
+ */
+ public void clearSamples() {
+ newestTime = 0;
+ overall = new Sample(0, nodes.length);
+ current = new Sample(0, nodes.length);
+ samples.clear();
+ }
+
+
+ /**
+ * Records a sample point of data about intent operation rate.
+ *
+ * @param overallRate overall rate
+ * @param currentRate current rate
+ */
+ public void recordSample(double overallRate, double currentRate) {
+ long now = System.currentTimeMillis();
+ addSample(now, nodeId, overallRate, currentRate);
+ broadcastSample(now, nodeId, overallRate, currentRate);
+ }
+
+ /**
+ * Returns set of node ids as headers.
+ *
+ * @return node id headers
+ */
+ public List<String> getSampleHeaders() {
+ List<String> headers = new ArrayList<>();
+ for (ControllerNode node : nodes) {
+ headers.add(node.id().toString());
+ }
+ return headers;
+ }
+
+ /**
+ * Returns set of all accumulated samples normalized to the local set of
+ * samples.
+ *
+ * @return accumulated samples
+ */
+ public synchronized List<Sample> getSamples() {
+ return ImmutableList.copyOf(samples);
+ }
+
+ /**
+ * Returns overall throughput performance for each of the cluster nodes.
+ *
+ * @return overall intent throughput
+ */
+ public synchronized Sample getOverall() {
+ return overall;
+ }
+
+ // Records a new sample to our collection of samples
+ private synchronized void addSample(long time, NodeId nodeId,
+ double overallRate, double currentRate) {
+ Sample fullSample = createCurrentSampleIfNeeded(time);
+ setSampleData(current, nodeId, currentRate);
+ setSampleData(overall, nodeId, overallRate);
+ pruneSamplesIfNeeded();
+
+ if (fullSample != null && ui != null) {
+ ui.reportSample(fullSample);
+ }
+ }
+
+ private Sample createCurrentSampleIfNeeded(long time) {
+ Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
+ if (oldSample != null) {
+ newestTime = time;
+ current = new Sample(time, nodes.length);
+ if (oldSample.time > 0) {
+ samples.add(oldSample);
+ }
+ }
+ return oldSample;
+ }
+
+ private void setSampleData(Sample sample, NodeId nodeId, double data) {
+ Integer index = nodeToIndex.get(nodeId);
+ if (index != null) {
+ sample.data[index] = data;
+ }
+ }
+
+ private void pruneSamplesIfNeeded() {
+ if (samples.size() > MAX_SAMPLES) {
+ samples.remove(0);
+ }
+ }
+
+ // Performance data sample.
+ static class Sample {
+ final long time;
+ final double[] data;
+
+ public Sample(long time, int nodeCount) {
+ this.time = time;
+ this.data = new double[nodeCount];
+ Arrays.fill(data, -1);
+ }
+
+ public boolean isComplete() {
+ for (int i = 0; i < data.length; i++) {
+ if (data[i] < 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
+ String data = String.format("%d|%f|%f", time, overallRate, currentRate);
+ communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
+ }
+
+ private class InternalSampleCollector implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ String[] fields = new String(message.payload()).split("\\|");
+ log.debug("Received sample from {}: {}", message.sender(), fields);
+ addSample(Long.parseLong(fields[0]), message.sender(),
+ Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
+ }
+ }
+}