diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java index f0f3eb5e..3865a779 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java @@ -41,6 +41,7 @@ import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.ConsistentMapException; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.slf4j.Logger; @@ -52,6 +53,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import static org.onlab.util.Tools.groupedThreads; +import static org.onlab.util.Tools.retryable; import static org.slf4j.LoggerFactory.getLogger; /** @@ -66,6 +68,8 @@ public class DistributedPacketStore private final Logger log = getLogger(getClass()); + private static final int MAX_BACKOFF = 10; + // TODO: make this configurable. private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4; @@ -159,11 +163,11 @@ public class DistributedPacketStore return tracker.requests(); } - private class PacketRequestTracker { + private final class PacketRequestTracker { private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests; - public PacketRequestTracker() { + private PacketRequestTracker() { requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder() .withName("onos-packet-requests") .withPartitionsDisabled() @@ -171,7 +175,17 @@ public class DistributedPacketStore .build(); } - public void add(PacketRequest request) { + private void add(PacketRequest request) { + AtomicBoolean firstRequest = + retryable(this::addInternal, ConsistentMapException.class, + 3, MAX_BACKOFF).apply(request); + if (firstRequest.get() && delegate != null) { + // The instance that makes the first request will push to all devices + delegate.requestPackets(request); + } + } + + private AtomicBoolean addInternal(PacketRequest request) { AtomicBoolean firstRequest = new AtomicBoolean(false); requests.compute(request.selector(), (s, existingRequests) -> { if (existingRequests == null) { @@ -186,14 +200,20 @@ public class DistributedPacketStore return existingRequests; } }); + return firstRequest; + } - if (firstRequest.get() && delegate != null) { - // The instance that makes the first request will push to all devices - delegate.requestPackets(request); + private void remove(PacketRequest request) { + AtomicBoolean removedLast = + retryable(this::removeInternal, ConsistentMapException.class, + 3, MAX_BACKOFF).apply(request); + if (removedLast.get() && delegate != null) { + // The instance that removes the last request will remove from all devices + delegate.cancelPackets(request); } } - public void remove(PacketRequest request) { + private AtomicBoolean removeInternal(PacketRequest request) { AtomicBoolean removedLast = new AtomicBoolean(false); requests.computeIfPresent(request.selector(), (s, existingRequests) -> { if (existingRequests.contains(request)) { @@ -209,15 +229,10 @@ public class DistributedPacketStore return existingRequests; } }); - - if (removedLast.get() && delegate != null) { - // The instance that removes the last request will remove from all devices - delegate.cancelPackets(request); - } - + return removedLast; } - public List<PacketRequest> requests() { + private List<PacketRequest> requests() { List<PacketRequest> list = Lists.newArrayList(); requests.values().forEach(v -> list.addAll(v.value())); list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); |