diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-10-09 18:32:44 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-10-09 18:32:44 -0700 |
commit | 6a07d2d622eaa06953f3353e39c080984076e8de (patch) | |
tree | bfb50a2090fce186c2cc545a400c969bf2ea702b /framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java | |
parent | e6d71622143ff9b2421a1abbe8434b954b5b1099 (diff) |
Updated master to commit id 6ee8aa3e67ce89908a8c93aa9445c6f71a18f986
Change-Id: I94b055ee2f298daf71e2ec794fd0f2495bd8081f
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java | 78 |
1 files changed, 48 insertions, 30 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java index d4c89c93..f0f3eb5e 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java @@ -15,7 +15,9 @@ */ package org.onosproject.store.packet.impl; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -41,14 +43,13 @@ import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Versioned; import org.slf4j.Logger; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; @@ -117,6 +118,7 @@ public class DistributedPacketStore public void deactivate() { communicationService.removeSubscriber(PACKET_OUT_SUBJECT); messageHandlingExecutor.shutdown(); + tracker = null; log.info("Stopped"); } @@ -143,13 +145,13 @@ public class DistributedPacketStore } @Override - public boolean requestPackets(PacketRequest request) { - return tracker.add(request); + public void requestPackets(PacketRequest request) { + tracker.add(request); } @Override - public boolean cancelPackets(PacketRequest request) { - return tracker.remove(request); + public void cancelPackets(PacketRequest request) { + tracker.remove(request); } @Override @@ -169,33 +171,50 @@ public class DistributedPacketStore .build(); } - public boolean add(PacketRequest request) { - Versioned<Set<PacketRequest>> old = requests.get(request.selector()); - if (old != null && old.value().contains(request)) { - return false; + public void add(PacketRequest request) { + AtomicBoolean firstRequest = new AtomicBoolean(false); + requests.compute(request.selector(), (s, existingRequests) -> { + if (existingRequests == null) { + firstRequest.set(true); + return ImmutableSet.of(request); + } else if (!existingRequests.contains(request)) { + return ImmutableSet.<PacketRequest>builder() + .addAll(existingRequests) + .add(request) + .build(); + } else { + return existingRequests; + } + }); + + if (firstRequest.get() && delegate != null) { + // The instance that makes the first request will push to all devices + delegate.requestPackets(request); } - // FIXME: add retry logic using a random delay - Set<PacketRequest> newSet = new HashSet<>(); - newSet.add(request); - if (old == null) { - return requests.putIfAbsent(request.selector(), newSet) == null; - } - newSet.addAll(old.value()); - return requests.replace(request.selector(), old.version(), newSet); } - public boolean remove(PacketRequest request) { - Versioned<Set<PacketRequest>> old = requests.get(request.selector()); - if (old == null || !old.value().contains(request)) { - return false; - } - // FIXME: add retry logic using a random delay - Set<PacketRequest> newSet = new HashSet<>(old.value()); - newSet.remove(request); - if (newSet.isEmpty()) { - return requests.remove(request.selector(), old.version()); + public void remove(PacketRequest request) { + AtomicBoolean removedLast = new AtomicBoolean(false); + requests.computeIfPresent(request.selector(), (s, existingRequests) -> { + if (existingRequests.contains(request)) { + Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests); + newRequests.remove(request); + if (newRequests.size() > 0) { + return ImmutableSet.copyOf(newRequests); + } else { + removedLast.set(true); + return null; + } + } else { + return existingRequests; + } + }); + + if (removedLast.get() && delegate != null) { + // The instance that removes the last request will remove from all devices + delegate.cancelPackets(request); } - return requests.replace(request.selector(), old.version(), newSet); + } public List<PacketRequest> requests() { @@ -204,6 +223,5 @@ public class DistributedPacketStore list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); return list; } - } } |