aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java78
1 files changed, 48 insertions, 30 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index d4c89c93..f0f3eb5e 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,7 +15,9 @@
*/
package org.onosproject.store.packet.impl;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,14 +43,13 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -117,6 +118,7 @@ public class DistributedPacketStore
public void deactivate() {
communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
messageHandlingExecutor.shutdown();
+ tracker = null;
log.info("Stopped");
}
@@ -143,13 +145,13 @@ public class DistributedPacketStore
}
@Override
- public boolean requestPackets(PacketRequest request) {
- return tracker.add(request);
+ public void requestPackets(PacketRequest request) {
+ tracker.add(request);
}
@Override
- public boolean cancelPackets(PacketRequest request) {
- return tracker.remove(request);
+ public void cancelPackets(PacketRequest request) {
+ tracker.remove(request);
}
@Override
@@ -169,33 +171,50 @@ public class DistributedPacketStore
.build();
}
- public boolean add(PacketRequest request) {
- Versioned<Set<PacketRequest>> old = requests.get(request.selector());
- if (old != null && old.value().contains(request)) {
- return false;
+ public void add(PacketRequest request) {
+ AtomicBoolean firstRequest = new AtomicBoolean(false);
+ requests.compute(request.selector(), (s, existingRequests) -> {
+ if (existingRequests == null) {
+ firstRequest.set(true);
+ return ImmutableSet.of(request);
+ } else if (!existingRequests.contains(request)) {
+ return ImmutableSet.<PacketRequest>builder()
+ .addAll(existingRequests)
+ .add(request)
+ .build();
+ } else {
+ return existingRequests;
+ }
+ });
+
+ if (firstRequest.get() && delegate != null) {
+ // The instance that makes the first request will push to all devices
+ delegate.requestPackets(request);
}
- // FIXME: add retry logic using a random delay
- Set<PacketRequest> newSet = new HashSet<>();
- newSet.add(request);
- if (old == null) {
- return requests.putIfAbsent(request.selector(), newSet) == null;
- }
- newSet.addAll(old.value());
- return requests.replace(request.selector(), old.version(), newSet);
}
- public boolean remove(PacketRequest request) {
- Versioned<Set<PacketRequest>> old = requests.get(request.selector());
- if (old == null || !old.value().contains(request)) {
- return false;
- }
- // FIXME: add retry logic using a random delay
- Set<PacketRequest> newSet = new HashSet<>(old.value());
- newSet.remove(request);
- if (newSet.isEmpty()) {
- return requests.remove(request.selector(), old.version());
+ public void remove(PacketRequest request) {
+ AtomicBoolean removedLast = new AtomicBoolean(false);
+ requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+ if (existingRequests.contains(request)) {
+ Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+ newRequests.remove(request);
+ if (newRequests.size() > 0) {
+ return ImmutableSet.copyOf(newRequests);
+ } else {
+ removedLast.set(true);
+ return null;
+ }
+ } else {
+ return existingRequests;
+ }
+ });
+
+ if (removedLast.get() && delegate != null) {
+ // The instance that removes the last request will remove from all devices
+ delegate.cancelPackets(request);
}
- return requests.replace(request.selector(), old.version(), newSet);
+
}
public List<PacketRequest> requests() {
@@ -204,6 +223,5 @@ public class DistributedPacketStore
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
return list;
}
-
}
}