aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java')
-rw-r--r--framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java234
1 files changed, 0 insertions, 234 deletions
diff --git a/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
deleted file mode 100644
index cae5455d..00000000
--- a/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.intentperf;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Collects and distributes performance samples.
- */
-@Component(immediate = true)
-@Service(value = IntentPerfCollector.class)
-public class IntentPerfCollector {
-
- private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
- private final Logger log = getLogger(getClass());
-
- private static final int MAX_SAMPLES = 1_000;
-
- private final List<Sample> samples = new LinkedList<>();
-
- private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService communicationService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected IntentPerfUi ui;
-
- // Auxiliary structures used to accrue data for normalized time interval
- // across all nodes.
- private long newestTime;
- private Sample overall;
- private Sample current;
-
- private ControllerNode[] nodes;
- private Map<NodeId, Integer> nodeToIndex;
-
- private NodeId nodeId;
-
- @Activate
- public void activate() {
- nodeId = clusterService.getLocalNode().id();
-
- communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
- getPoolThreadExecutor());
-
- nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
- Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
-
- nodeToIndex = new HashMap<>();
- for (int i = 0; i < nodes.length; i++) {
- nodeToIndex.put(nodes[i].id(), i);
- }
-
- clearSamples();
- ui.setCollector(this);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- communicationService.removeSubscriber(SAMPLE);
- log.info("Stopped");
- }
-
- /**
- * Clears all previously accumulated data.
- */
- public synchronized void clearSamples() {
- newestTime = 0;
- overall = new Sample(0, nodes.length);
- current = new Sample(0, nodes.length);
- samples.clear();
- }
-
-
- /**
- * Records a sample point of data about intent operation rate.
- *
- * @param overallRate overall rate
- * @param currentRate current rate
- */
- public void recordSample(double overallRate, double currentRate) {
- long now = System.currentTimeMillis();
- addSample(now, nodeId, overallRate, currentRate);
- broadcastSample(now, nodeId, overallRate, currentRate);
- }
-
- /**
- * Returns set of node ids as headers.
- *
- * @return node id headers
- */
- public List<String> getSampleHeaders() {
- List<String> headers = new ArrayList<>();
- for (ControllerNode node : nodes) {
- headers.add(node.id().toString());
- }
- return headers;
- }
-
- /**
- * Returns set of all accumulated samples normalized to the local set of
- * samples.
- *
- * @return accumulated samples
- */
- public synchronized List<Sample> getSamples() {
- return ImmutableList.copyOf(samples);
- }
-
- /**
- * Returns overall throughput performance for each of the cluster nodes.
- *
- * @return overall intent throughput
- */
- public synchronized Sample getOverall() {
- return overall;
- }
-
- // Records a new sample to our collection of samples
- private synchronized void addSample(long time, NodeId nodeId,
- double overallRate, double currentRate) {
- Sample fullSample = createCurrentSampleIfNeeded(time);
- setSampleData(current, nodeId, currentRate);
- setSampleData(overall, nodeId, overallRate);
- pruneSamplesIfNeeded();
-
- if (fullSample != null && ui != null) {
- ui.reportSample(fullSample);
- }
- }
-
- private Sample createCurrentSampleIfNeeded(long time) {
- Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
- if (oldSample != null) {
- newestTime = time;
- current = new Sample(time, nodes.length);
- if (oldSample.time > 0) {
- samples.add(oldSample);
- }
- }
- return oldSample;
- }
-
- private void setSampleData(Sample sample, NodeId nodeId, double data) {
- Integer index = nodeToIndex.get(nodeId);
- if (index != null) {
- sample.data[index] = data;
- }
- }
-
- private void pruneSamplesIfNeeded() {
- if (samples.size() > MAX_SAMPLES) {
- samples.remove(0);
- }
- }
-
- // Performance data sample.
- static class Sample {
- final long time;
- final double[] data;
-
- public Sample(long time, int nodeCount) {
- this.time = time;
- this.data = new double[nodeCount];
- Arrays.fill(data, -1);
- }
-
- public boolean isComplete() {
- for (int i = 0; i < data.length; i++) {
- if (data[i] < 0) {
- return false;
- }
- }
- return true;
- }
- }
-
- private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
- String data = String.format("%d|%f|%f", time, overallRate, currentRate);
- communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
- }
-
- private class InternalSampleCollector implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- String[] fields = new String(message.payload()).split("\\|");
- log.debug("Received sample from {}: {}", message.sender(), fields);
- addSample(Long.parseLong(fields[0]), message.sender(),
- Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
- }
- }
-}