aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java317
1 files changed, 0 insertions, 317 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
deleted file mode 100644
index 35bdad14..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.statistic.impl;
-
-import com.google.common.collect.Sets;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.statistic.StatisticStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.slf4j.Logger;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
-import static org.slf4j.LoggerFactory.getLogger;
-
-
-/**
- * Maintains statistics using RPC calls to collect stats from remote instances
- * on demand.
- */
-@Component(immediate = true)
-@Service
-public class DistributedStatisticStore implements StatisticStore {
-
- private final Logger log = getLogger(getClass());
-
- // TODO: Make configurable.
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- private Map<ConnectPoint, InternalStatisticRepresentation> representations =
- new ConcurrentHashMap<>();
-
- private Map<ConnectPoint, Set<FlowEntry>> previous =
- new ConcurrentHashMap<>();
-
- private Map<ConnectPoint, Set<FlowEntry>> current =
- new ConcurrentHashMap<>();
-
- protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- // register this store specific classes here
- .build();
- }
- };
-
- private ExecutorService messageHandlingExecutor;
-
- private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
-
- @Activate
- public void activate() {
-
- messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
- groupedThreads("onos/store/statistic", "message-handlers"));
-
- clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
- SERIALIZER::decode,
- this::getCurrentStatisticInternal,
- SERIALIZER::encode,
- messageHandlingExecutor);
-
- clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
- SERIALIZER::decode,
- this::getPreviousStatisticInternal,
- SERIALIZER::encode,
- messageHandlingExecutor);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- clusterCommunicator.removeSubscriber(GET_PREVIOUS);
- clusterCommunicator.removeSubscriber(GET_CURRENT);
- messageHandlingExecutor.shutdown();
- log.info("Stopped");
- }
-
- @Override
- public void prepareForStatistics(FlowRule rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
- InternalStatisticRepresentation rep;
- synchronized (representations) {
- rep = getOrCreateRepresentation(cp);
- }
- rep.prepare();
- }
-
- @Override
- public synchronized void removeFromStatistics(FlowRule rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
- InternalStatisticRepresentation rep = representations.get(cp);
- if (rep != null && rep.remove(rule)) {
- updatePublishedStats(cp, Collections.emptySet());
- }
- Set<FlowEntry> values = current.get(cp);
- if (values != null) {
- values.remove(rule);
- }
- values = previous.get(cp);
- if (values != null) {
- values.remove(rule);
- }
-
- }
-
- @Override
- public void addOrUpdateStatistic(FlowEntry rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
- InternalStatisticRepresentation rep = representations.get(cp);
- if (rep != null && rep.submit(rule)) {
- updatePublishedStats(cp, rep.get());
- }
- }
-
- private synchronized void updatePublishedStats(ConnectPoint cp,
- Set<FlowEntry> flowEntries) {
- Set<FlowEntry> curr = current.get(cp);
- if (curr == null) {
- curr = new HashSet<>();
- }
- previous.put(cp, curr);
- current.put(cp, flowEntries);
-
- }
-
- @Override
- public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
- final DeviceId deviceId = connectPoint.deviceId();
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
- log.warn("No master for {}", deviceId);
- return Collections.emptySet();
- }
- if (master.equals(clusterService.getLocalNode().id())) {
- return getCurrentStatisticInternal(connectPoint);
- } else {
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
- connectPoint,
- GET_CURRENT,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master),
- STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptySet());
- }
-
- }
-
- private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
- return current.get(connectPoint);
- }
-
- @Override
- public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
- final DeviceId deviceId = connectPoint.deviceId();
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
- log.warn("No master for {}", deviceId);
- return Collections.emptySet();
- }
- if (master.equals(clusterService.getLocalNode().id())) {
- return getPreviousStatisticInternal(connectPoint);
- } else {
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
- connectPoint,
- GET_PREVIOUS,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master),
- STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptySet());
- }
- }
-
- private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
- return previous.get(connectPoint);
- }
-
- private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
-
- if (representations.containsKey(cp)) {
- return representations.get(cp);
- } else {
- InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
- representations.put(cp, rep);
- return rep;
- }
-
- }
-
- private ConnectPoint buildConnectPoint(FlowRule rule) {
- PortNumber port = getOutput(rule);
-
- if (port == null) {
- return null;
- }
- ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
- return cp;
- }
-
- private PortNumber getOutput(FlowRule rule) {
- for (Instruction i : rule.treatment().allInstructions()) {
- if (i.type() == Instruction.Type.OUTPUT) {
- Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
- return out.port();
- }
- if (i.type() == Instruction.Type.DROP) {
- return PortNumber.P0;
- }
- }
- return null;
- }
-
- private class InternalStatisticRepresentation {
-
- private final AtomicInteger counter = new AtomicInteger(0);
- private final Set<FlowEntry> rules = new HashSet<>();
-
- public void prepare() {
- counter.incrementAndGet();
- }
-
- public synchronized boolean remove(FlowRule rule) {
- rules.remove(rule);
- return counter.decrementAndGet() == 0;
- }
-
- public synchronized boolean submit(FlowEntry rule) {
- if (rules.contains(rule)) {
- rules.remove(rule);
- }
- rules.add(rule);
- if (counter.get() == 0) {
- return true;
- } else {
- return counter.decrementAndGet() == 0;
- }
- }
-
- public synchronized Set<FlowEntry> get() {
- counter.set(rules.size());
- return Sets.newHashSet(rules);
- }
-
-
- }
-
-}