/* * Copyright 2014-2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.store.packet.impl; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.NodeId; import org.onosproject.mastership.MastershipService; import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.packet.OutboundPacket; import org.onosproject.net.packet.PacketEvent; import org.onosproject.net.packet.PacketEvent.Type; import org.onosproject.net.packet.PacketRequest; import org.onosproject.net.packet.PacketStore; import org.onosproject.net.packet.PacketStoreDelegate; import org.onosproject.store.AbstractStore; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.slf4j.Logger; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; /** * Distributed packet store implementation allowing packets to be sent to * remote instances. */ @Component(immediate = true) @Service public class DistributedPacketStore extends AbstractStore implements PacketStore { private final Logger log = getLogger(getClass()); // TODO: make this configurable. private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected MastershipService mastershipService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterService clusterService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterCommunicationService communicationService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected StorageService storageService; private PacketRequestTracker tracker; private static final MessageSubject PACKET_OUT_SUBJECT = new MessageSubject("packet-out"); private static final KryoSerializer SERIALIZER = new KryoSerializer() { @Override protected void setupKryoPool() { serializerPool = KryoNamespace.newBuilder() .register(KryoNamespaces.API) .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID) .build(); } }; private ExecutorService messageHandlingExecutor; @Activate public void activate() { messageHandlingExecutor = Executors.newFixedThreadPool( MESSAGE_HANDLER_THREAD_POOL_SIZE, groupedThreads("onos/store/packet", "message-handlers")); communicationService.addSubscriber(PACKET_OUT_SUBJECT, SERIALIZER::decode, packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)), messageHandlingExecutor); tracker = new PacketRequestTracker(); log.info("Started"); } @Deactivate public void deactivate() { communicationService.removeSubscriber(PACKET_OUT_SUBJECT); messageHandlingExecutor.shutdown(); tracker = null; log.info("Stopped"); } @Override public void emit(OutboundPacket packet) { NodeId myId = clusterService.getLocalNode().id(); NodeId master = mastershipService.getMasterFor(packet.sendThrough()); if (master == null) { return; } if (myId.equals(master)) { notifyDelegate(new PacketEvent(Type.EMIT, packet)); return; } communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master) .whenComplete((r, error) -> { if (error != null) { log.warn("Failed to send packet-out to {}", master, error); } }); } @Override public void requestPackets(PacketRequest request) { tracker.add(request); } @Override public void cancelPackets(PacketRequest request) { tracker.remove(request); } @Override public List existingRequests() { return tracker.requests(); } private class PacketRequestTracker { private ConsistentMap> requests; public PacketRequestTracker() { requests = storageService.>consistentMapBuilder() .withName("onos-packet-requests") .withPartitionsDisabled() .withSerializer(Serializer.using(KryoNamespaces.API)) .build(); } public void add(PacketRequest request) { AtomicBoolean firstRequest = new AtomicBoolean(false); requests.compute(request.selector(), (s, existingRequests) -> { if (existingRequests == null) { firstRequest.set(true); return ImmutableSet.of(request); } else if (!existingRequests.contains(request)) { return ImmutableSet.builder() .addAll(existingRequests) .add(request) .build(); } else { return existingRequests; } }); if (firstRequest.get() && delegate != null) { // The instance that makes the first request will push to all devices delegate.requestPackets(request); } } public void remove(PacketRequest request) { AtomicBoolean removedLast = new AtomicBoolean(false); requests.computeIfPresent(request.selector(), (s, existingRequests) -> { if (existingRequests.contains(request)) { Set newRequests = Sets.newHashSet(existingRequests); newRequests.remove(request); if (newRequests.size() > 0) { return ImmutableSet.copyOf(newRequests); } else { removedLast.set(true); return null; } } else { return existingRequests; } }); if (removedLast.get() && delegate != null) { // The instance that removes the last request will remove from all devices delegate.cancelPackets(request); } } public List requests() { List list = Lists.newArrayList(); requests.values().forEach(v -> list.addAll(v.value())); list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); return list; } } }