diff options
Diffstat (limited to 'framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java')
-rw-r--r-- | framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java new file mode 100644 index 00000000..8c160e85 --- /dev/null +++ b/framework/src/onos/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java @@ -0,0 +1,234 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.intentperf; + +import com.google.common.collect.ImmutableList; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterMessage; +import org.onosproject.store.cluster.messaging.ClusterMessageHandler; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.onlab.util.SharedExecutors.getPoolThreadExecutor; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Collects and distributes performance samples. + */ +@Component(immediate = true) +@Service(value = IntentPerfCollector.class) +public class IntentPerfCollector { + + private static final long SAMPLE_TIME_WINDOW_MS = 5_000; + private final Logger log = getLogger(getClass()); + + private static final int MAX_SAMPLES = 1_000; + + private final List<Sample> samples = new LinkedList<>(); + + private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample"); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService communicationService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected IntentPerfUi ui; + + // Auxiliary structures used to accrue data for normalized time interval + // across all nodes. + private long newestTime; + private Sample overall; + private Sample current; + + private ControllerNode[] nodes; + private Map<NodeId, Integer> nodeToIndex; + + private NodeId nodeId; + + @Activate + public void activate() { + nodeId = clusterService.getLocalNode().id(); + + communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(), + getPoolThreadExecutor()); + + nodes = clusterService.getNodes().toArray(new ControllerNode[]{}); + Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString())); + + nodeToIndex = new HashMap<>(); + for (int i = 0; i < nodes.length; i++) { + nodeToIndex.put(nodes[i].id(), i); + } + + clearSamples(); + ui.setCollector(this); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + communicationService.removeSubscriber(SAMPLE); + log.info("Stopped"); + } + + /** + * Clears all previously accumulated data. + */ + public void clearSamples() { + newestTime = 0; + overall = new Sample(0, nodes.length); + current = new Sample(0, nodes.length); + samples.clear(); + } + + + /** + * Records a sample point of data about intent operation rate. + * + * @param overallRate overall rate + * @param currentRate current rate + */ + public void recordSample(double overallRate, double currentRate) { + long now = System.currentTimeMillis(); + addSample(now, nodeId, overallRate, currentRate); + broadcastSample(now, nodeId, overallRate, currentRate); + } + + /** + * Returns set of node ids as headers. + * + * @return node id headers + */ + public List<String> getSampleHeaders() { + List<String> headers = new ArrayList<>(); + for (ControllerNode node : nodes) { + headers.add(node.id().toString()); + } + return headers; + } + + /** + * Returns set of all accumulated samples normalized to the local set of + * samples. + * + * @return accumulated samples + */ + public synchronized List<Sample> getSamples() { + return ImmutableList.copyOf(samples); + } + + /** + * Returns overall throughput performance for each of the cluster nodes. + * + * @return overall intent throughput + */ + public synchronized Sample getOverall() { + return overall; + } + + // Records a new sample to our collection of samples + private synchronized void addSample(long time, NodeId nodeId, + double overallRate, double currentRate) { + Sample fullSample = createCurrentSampleIfNeeded(time); + setSampleData(current, nodeId, currentRate); + setSampleData(overall, nodeId, overallRate); + pruneSamplesIfNeeded(); + + if (fullSample != null && ui != null) { + ui.reportSample(fullSample); + } + } + + private Sample createCurrentSampleIfNeeded(long time) { + Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null; + if (oldSample != null) { + newestTime = time; + current = new Sample(time, nodes.length); + if (oldSample.time > 0) { + samples.add(oldSample); + } + } + return oldSample; + } + + private void setSampleData(Sample sample, NodeId nodeId, double data) { + Integer index = nodeToIndex.get(nodeId); + if (index != null) { + sample.data[index] = data; + } + } + + private void pruneSamplesIfNeeded() { + if (samples.size() > MAX_SAMPLES) { + samples.remove(0); + } + } + + // Performance data sample. + static class Sample { + final long time; + final double[] data; + + public Sample(long time, int nodeCount) { + this.time = time; + this.data = new double[nodeCount]; + Arrays.fill(data, -1); + } + + public boolean isComplete() { + for (int i = 0; i < data.length; i++) { + if (data[i] < 0) { + return false; + } + } + return true; + } + } + + private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) { + String data = String.format("%d|%f|%f", time, overallRate, currentRate); + communicationService.broadcast(data, SAMPLE, str -> str.getBytes()); + } + + private class InternalSampleCollector implements ClusterMessageHandler { + @Override + public void handle(ClusterMessage message) { + String[] fields = new String(message.payload()).split("\\|"); + log.debug("Received sample from {}: {}", message.sender(), fields); + addSample(Long.parseLong(fields[0]), message.sender(), + Double.parseDouble(fields[1]), Double.parseDouble(fields[2])); + } + } +} |