aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java64
1 files changed, 64 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index de7a3ac3..8cd63e7d 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.flow.impl;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -57,6 +58,7 @@ import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -64,9 +66,16 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -151,6 +160,13 @@ public class NewDistributedFlowRuleStore
private final ScheduledExecutorService backupSenderExecutor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
+ private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
+ private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
+ new InternalTableStatsListener();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -161,6 +177,11 @@ public class NewDistributedFlowRuleStore
}
};
+ protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MastershipBasedTimestamp.class);
+
+
private IdGenerator idGenerator;
private NodeId local;
@@ -186,6 +207,15 @@ public class NewDistributedFlowRuleStore
TimeUnit.MILLISECONDS);
}
+ deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
+ .withName("onos-flow-table-stats")
+ .withSerializer(SERIALIZER_BUILDER)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+ deviceTableStats.addListener(tableStatsListener);
+
logConfig("Started");
}
@@ -197,6 +227,8 @@ public class NewDistributedFlowRuleStore
}
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
+ deviceTableStats.removeListener(tableStatsListener);
+ deviceTableStats.destroy();
messageHandlingExecutor.shutdownNow();
backupSenderExecutor.shutdownNow();
log.info("Stopped");
@@ -786,4 +818,36 @@ public class NewDistributedFlowRuleStore
return backedupDevices;
}
}
+
+ @Override
+ public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
+ List<TableStatisticsEntry> tableStats) {
+ deviceTableStats.put(deviceId, tableStats);
+ return null;
+ }
+
+ @Override
+ public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.debug("Failed to getTableStats: No master for {}", deviceId);
+ return Collections.emptyList();
+ }
+
+ List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
+ if (tableStats == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(tableStats);
+ }
+
+ private class InternalTableStatsListener
+ implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceId,
+ List<TableStatisticsEntry>> event) {
+ //TODO: Generate an event to listeners (do we need?)
+ }
+ }
}