diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet')
2 files changed, 0 insertions, 262 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java deleted file mode 100644 index 3865a779..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.packet.impl; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.util.KryoNamespace; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.mastership.MastershipService; -import org.onosproject.net.flow.TrafficSelector; -import org.onosproject.net.packet.OutboundPacket; -import org.onosproject.net.packet.PacketEvent; -import org.onosproject.net.packet.PacketEvent.Type; -import org.onosproject.net.packet.PacketRequest; -import org.onosproject.net.packet.PacketStore; -import org.onosproject.net.packet.PacketStoreDelegate; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.StorageService; -import org.slf4j.Logger; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.onlab.util.Tools.groupedThreads; -import static org.onlab.util.Tools.retryable; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * Distributed packet store implementation allowing packets to be sent to - * remote instances. - */ -@Component(immediate = true) -@Service -public class DistributedPacketStore - extends AbstractStore<PacketEvent, PacketStoreDelegate> - implements PacketStore { - - private final Logger log = getLogger(getClass()); - - private static final int MAX_BACKOFF = 10; - - // TODO: make this configurable. - private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService communicationService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - private PacketRequestTracker tracker; - - private static final MessageSubject PACKET_OUT_SUBJECT = - new MessageSubject("packet-out"); - - private static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID) - .build(); - } - }; - - private ExecutorService messageHandlingExecutor; - - @Activate - public void activate() { - messageHandlingExecutor = Executors.newFixedThreadPool( - MESSAGE_HANDLER_THREAD_POOL_SIZE, - groupedThreads("onos/store/packet", "message-handlers")); - - communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT, - SERIALIZER::decode, - packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)), - messageHandlingExecutor); - - tracker = new PacketRequestTracker(); - - log.info("Started"); - } - - @Deactivate - public void deactivate() { - communicationService.removeSubscriber(PACKET_OUT_SUBJECT); - messageHandlingExecutor.shutdown(); - tracker = null; - log.info("Stopped"); - } - - @Override - public void emit(OutboundPacket packet) { - NodeId myId = clusterService.getLocalNode().id(); - NodeId master = mastershipService.getMasterFor(packet.sendThrough()); - - if (master == null) { - return; - } - - if (myId.equals(master)) { - notifyDelegate(new PacketEvent(Type.EMIT, packet)); - return; - } - - communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master) - .whenComplete((r, error) -> { - if (error != null) { - log.warn("Failed to send packet-out to {}", master, error); - } - }); - } - - @Override - public void requestPackets(PacketRequest request) { - tracker.add(request); - } - - @Override - public void cancelPackets(PacketRequest request) { - tracker.remove(request); - } - - @Override - public List<PacketRequest> existingRequests() { - return tracker.requests(); - } - - private final class PacketRequestTracker { - - private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests; - - private PacketRequestTracker() { - requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder() - .withName("onos-packet-requests") - .withPartitionsDisabled() - .withSerializer(Serializer.using(KryoNamespaces.API)) - .build(); - } - - private void add(PacketRequest request) { - AtomicBoolean firstRequest = - retryable(this::addInternal, ConsistentMapException.class, - 3, MAX_BACKOFF).apply(request); - if (firstRequest.get() && delegate != null) { - // The instance that makes the first request will push to all devices - delegate.requestPackets(request); - } - } - - private AtomicBoolean addInternal(PacketRequest request) { - AtomicBoolean firstRequest = new AtomicBoolean(false); - requests.compute(request.selector(), (s, existingRequests) -> { - if (existingRequests == null) { - firstRequest.set(true); - return ImmutableSet.of(request); - } else if (!existingRequests.contains(request)) { - return ImmutableSet.<PacketRequest>builder() - .addAll(existingRequests) - .add(request) - .build(); - } else { - return existingRequests; - } - }); - return firstRequest; - } - - private void remove(PacketRequest request) { - AtomicBoolean removedLast = - retryable(this::removeInternal, ConsistentMapException.class, - 3, MAX_BACKOFF).apply(request); - if (removedLast.get() && delegate != null) { - // The instance that removes the last request will remove from all devices - delegate.cancelPackets(request); - } - } - - private AtomicBoolean removeInternal(PacketRequest request) { - AtomicBoolean removedLast = new AtomicBoolean(false); - requests.computeIfPresent(request.selector(), (s, existingRequests) -> { - if (existingRequests.contains(request)) { - Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests); - newRequests.remove(request); - if (newRequests.size() > 0) { - return ImmutableSet.copyOf(newRequests); - } else { - removedLast.set(true); - return null; - } - } else { - return existingRequests; - } - }); - return removedLast; - } - - private List<PacketRequest> requests() { - List<PacketRequest> list = Lists.newArrayList(); - requests.values().forEach(v -> list.addAll(v.value())); - list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); - return list; - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/package-info.java deleted file mode 100644 index 43282583..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of distributed packet store. - */ -package org.onosproject.store.packet.impl; |