aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java289
1 files changed, 0 insertions, 289 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
deleted file mode 100644
index 0cd4a831..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.statistic.impl;
-
-import com.google.common.base.Objects;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.statistic.FlowStatisticStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.slf4j.Logger;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Maintains flow statistics using RPC calls to collect stats from remote instances
- * on demand.
- */
-@Component(immediate = true)
-@Service
-public class DistributedFlowStatisticStore implements FlowStatisticStore {
- private final Logger log = getLogger(getClass());
-
- // TODO: Make configurable.
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- private Map<ConnectPoint, Set<FlowEntry>> previous =
- new ConcurrentHashMap<>();
-
- private Map<ConnectPoint, Set<FlowEntry>> current =
- new ConcurrentHashMap<>();
-
- protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- // register this store specific classes here
- .build();
- }
- };
-
- private NodeId local;
- private ExecutorService messageHandlingExecutor;
-
- private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
-
- @Activate
- public void activate() {
- local = clusterService.getLocalNode().id();
-
- messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
- groupedThreads("onos/store/statistic", "message-handlers"));
-
- clusterCommunicator.addSubscriber(
- GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
- messageHandlingExecutor);
-
- clusterCommunicator.addSubscriber(
- GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
- messageHandlingExecutor);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- clusterCommunicator.removeSubscriber(GET_PREVIOUS);
- clusterCommunicator.removeSubscriber(GET_CURRENT);
- messageHandlingExecutor.shutdown();
- log.info("Stopped");
- }
-
- @Override
- public synchronized void removeFlowStatistic(FlowRule rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
-
- // remove this rule if present from current map
- current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
-
- // remove this on if present from previous map
- previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
- }
-
- @Override
- public synchronized void addFlowStatistic(FlowEntry rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
-
- // create one if absent and add this rule
- current.putIfAbsent(cp, new HashSet<>());
- current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
-
- // remove previous one if present
- previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
- }
-
- public synchronized void updateFlowStatistic(FlowEntry rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
-
- Set<FlowEntry> curr = current.get(cp);
- if (curr == null) {
- addFlowStatistic(rule);
- } else {
- Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
- findAny();
- if (f.isPresent() && rule.bytes() < f.get().bytes()) {
- log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
- " Invalid Flow Update! Will be removed!!" +
- " curr flowId=" + Long.toHexString(rule.id().value()) +
- ", prev flowId=" + Long.toHexString(f.get().id().value()) +
- ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
- ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
- ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
- // something is wrong! invalid flow entry, so delete it
- removeFlowStatistic(rule);
- return;
- }
- Set<FlowEntry> prev = previous.get(cp);
- if (prev == null) {
- prev = new HashSet<>();
- previous.put(cp, prev);
- }
-
- // previous one is exist
- if (f.isPresent()) {
- // remove old one and add new one
- prev.remove(rule);
- if (!prev.add(f.get())) {
- log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
- " flowId={}, add failed into previous.",
- Long.toHexString(rule.id().value()));
- }
- }
-
- // remove old one and add new one
- curr.remove(rule);
- if (!curr.add(rule)) {
- log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
- " flowId={}, add failed into current.",
- Long.toHexString(rule.id().value()));
- }
- }
- }
-
- @Override
- public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
- final DeviceId deviceId = connectPoint.deviceId();
-
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
- log.warn("No master for {}", deviceId);
- return Collections.emptySet();
- }
-
- if (Objects.equal(local, master)) {
- return getCurrentStatisticInternal(connectPoint);
- } else {
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
- connectPoint,
- GET_CURRENT,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master),
- STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptySet());
- }
- }
-
- private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
- return current.get(connectPoint);
- }
-
- @Override
- public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
- final DeviceId deviceId = connectPoint.deviceId();
-
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
- log.warn("No master for {}", deviceId);
- return Collections.emptySet();
- }
-
- if (Objects.equal(local, master)) {
- return getPreviousStatisticInternal(connectPoint);
- } else {
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
- connectPoint,
- GET_PREVIOUS,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master),
- STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptySet());
- }
- }
-
- private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
- return previous.get(connectPoint);
- }
-
- private ConnectPoint buildConnectPoint(FlowRule rule) {
- PortNumber port = getOutput(rule);
-
- if (port == null) {
- return null;
- }
- ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
- return cp;
- }
-
- private PortNumber getOutput(FlowRule rule) {
- for (Instruction i : rule.treatment().allInstructions()) {
- if (i.type() == Instruction.Type.OUTPUT) {
- Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
- return out.port();
- }
- if (i.type() == Instruction.Type.DROP) {
- return PortNumber.P0;
- }
- }
- return null;
- }
-} \ No newline at end of file