aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java43
1 files changed, 29 insertions, 14 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index f0f3eb5e..3865a779 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -41,6 +41,7 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
@@ -52,6 +53,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.retryable;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -66,6 +68,8 @@ public class DistributedPacketStore
private final Logger log = getLogger(getClass());
+ private static final int MAX_BACKOFF = 10;
+
// TODO: make this configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@@ -159,11 +163,11 @@ public class DistributedPacketStore
return tracker.requests();
}
- private class PacketRequestTracker {
+ private final class PacketRequestTracker {
private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
- public PacketRequestTracker() {
+ private PacketRequestTracker() {
requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
.withName("onos-packet-requests")
.withPartitionsDisabled()
@@ -171,7 +175,17 @@ public class DistributedPacketStore
.build();
}
- public void add(PacketRequest request) {
+ private void add(PacketRequest request) {
+ AtomicBoolean firstRequest =
+ retryable(this::addInternal, ConsistentMapException.class,
+ 3, MAX_BACKOFF).apply(request);
+ if (firstRequest.get() && delegate != null) {
+ // The instance that makes the first request will push to all devices
+ delegate.requestPackets(request);
+ }
+ }
+
+ private AtomicBoolean addInternal(PacketRequest request) {
AtomicBoolean firstRequest = new AtomicBoolean(false);
requests.compute(request.selector(), (s, existingRequests) -> {
if (existingRequests == null) {
@@ -186,14 +200,20 @@ public class DistributedPacketStore
return existingRequests;
}
});
+ return firstRequest;
+ }
- if (firstRequest.get() && delegate != null) {
- // The instance that makes the first request will push to all devices
- delegate.requestPackets(request);
+ private void remove(PacketRequest request) {
+ AtomicBoolean removedLast =
+ retryable(this::removeInternal, ConsistentMapException.class,
+ 3, MAX_BACKOFF).apply(request);
+ if (removedLast.get() && delegate != null) {
+ // The instance that removes the last request will remove from all devices
+ delegate.cancelPackets(request);
}
}
- public void remove(PacketRequest request) {
+ private AtomicBoolean removeInternal(PacketRequest request) {
AtomicBoolean removedLast = new AtomicBoolean(false);
requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
if (existingRequests.contains(request)) {
@@ -209,15 +229,10 @@ public class DistributedPacketStore
return existingRequests;
}
});
-
- if (removedLast.get() && delegate != null) {
- // The instance that removes the last request will remove from all devices
- delegate.cancelPackets(request);
- }
-
+ return removedLast;
}
- public List<PacketRequest> requests() {
+ private List<PacketRequest> requests() {
List<PacketRequest> list = Lists.newArrayList();
requests.values().forEach(v -> list.addAll(v.value()));
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());