aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java289
1 files changed, 289 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
new file mode 100644
index 00000000..0cd4a831
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.statistic.impl;
+
+import com.google.common.base.Objects;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.statistic.FlowStatisticStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Maintains flow statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedFlowStatisticStore implements FlowStatisticStore {
+ private final Logger log = getLogger(getClass());
+
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private Map<ConnectPoint, Set<FlowEntry>> previous =
+ new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> current =
+ new ConcurrentHashMap<>();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ // register this store specific classes here
+ .build();
+ }
+ };
+
+ private NodeId local;
+ private ExecutorService messageHandlingExecutor;
+
+ private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
+
+ @Activate
+ public void activate() {
+ local = clusterService.getLocalNode().id();
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/statistic", "message-handlers"));
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(GET_PREVIOUS);
+ clusterCommunicator.removeSubscriber(GET_CURRENT);
+ messageHandlingExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public synchronized void removeFlowStatistic(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // remove this rule if present from current map
+ current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+
+ // remove this on if present from previous map
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ @Override
+ public synchronized void addFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // create one if absent and add this rule
+ current.putIfAbsent(cp, new HashSet<>());
+ current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
+
+ // remove previous one if present
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ public synchronized void updateFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ Set<FlowEntry> curr = current.get(cp);
+ if (curr == null) {
+ addFlowStatistic(rule);
+ } else {
+ Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
+ findAny();
+ if (f.isPresent() && rule.bytes() < f.get().bytes()) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " Invalid Flow Update! Will be removed!!" +
+ " curr flowId=" + Long.toHexString(rule.id().value()) +
+ ", prev flowId=" + Long.toHexString(f.get().id().value()) +
+ ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
+ ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
+ ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
+ // something is wrong! invalid flow entry, so delete it
+ removeFlowStatistic(rule);
+ return;
+ }
+ Set<FlowEntry> prev = previous.get(cp);
+ if (prev == null) {
+ prev = new HashSet<>();
+ previous.put(cp, prev);
+ }
+
+ // previous one is exist
+ if (f.isPresent()) {
+ // remove old one and add new one
+ prev.remove(rule);
+ if (!prev.add(f.get())) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into previous.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+
+ // remove old one and add new one
+ curr.remove(rule);
+ if (!curr.add(rule)) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into current.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+ }
+
+ @Override
+ public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getCurrentStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_CURRENT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+ return current.get(connectPoint);
+ }
+
+ @Override
+ public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getPreviousStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_PREVIOUS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+ return previous.get(connectPoint);
+ }
+
+ private ConnectPoint buildConnectPoint(FlowRule rule) {
+ PortNumber port = getOutput(rule);
+
+ if (port == null) {
+ return null;
+ }
+ ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+ return cp;
+ }
+
+ private PortNumber getOutput(FlowRule rule) {
+ for (Instruction i : rule.treatment().allInstructions()) {
+ if (i.type() == Instruction.Type.OUTPUT) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+ return out.port();
+ }
+ if (i.type() == Instruction.Type.DROP) {
+ return PortNumber.P0;
+ }
+ }
+ return null;
+ }
+} \ No newline at end of file