diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java new file mode 100644 index 00000000..851185b5 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java @@ -0,0 +1,174 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.proxyarp.impl; + +import com.google.common.collect.Maps; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.Host; +import org.onosproject.net.HostId; +import org.onosproject.net.host.HostEvent; +import org.onosproject.net.host.HostListener; +import org.onosproject.net.host.HostService; +import org.onosproject.net.proxyarp.ProxyArpStore; +import org.onosproject.net.proxyarp.ProxyArpStoreDelegate; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.serializers.KryoSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; +import static org.onlab.util.Tools.groupedThreads; + +/** + * Implementation of proxy ARP distribution mechanism. + */ +@Component(immediate = true) +@Service +public class DistributedProxyArpStore implements ProxyArpStore { + + private Logger log = LoggerFactory.getLogger(getClass()); + + private static final MessageSubject ARP_RESPONSE_MESSAGE = + new MessageSubject("onos-arp-response"); + + protected final KryoSerializer serializer = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(ArpResponseMessage.class) + .register(ByteBuffer.class) + .build(); + } + }; + + private ProxyArpStoreDelegate delegate; + + private Map<HostId, ArpResponseMessage> pendingMessages = Maps.newConcurrentMap(); + + private ExecutorService executor = + newFixedThreadPool(4, groupedThreads("onos/arp", "sender-%d")); + + private NodeId localNodeId; + + private HostListener hostListener = new InternalHostListener(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService commService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected HostService hostService; + + + @Activate + protected void activate() { + localNodeId = clusterService.getLocalNode().id(); + hostService.addListener(hostListener); + commService.addSubscriber(ARP_RESPONSE_MESSAGE, serializer::decode, + this::processArpResponse, executor); + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + commService.removeSubscriber(ARP_RESPONSE_MESSAGE); + hostService.removeListener(hostListener); + log.info("Stopped"); + } + + @Override + public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) { + NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId()); + if (nodeId.equals(localNodeId)) { + if (delegate != null) { + delegate.emitResponse(outPort, packet); + } + } else { + log.info("Forwarding ARP response from {} to {}", subject.id(), outPort); + commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()), + ARP_RESPONSE_MESSAGE, serializer::encode, nodeId); + } + } + + @Override + public void setDelegate(ProxyArpStoreDelegate delegate) { + this.delegate = delegate; + } + + // Processes the incoming ARP response message. + private void processArpResponse(ArpResponseMessage msg) { + pendingMessages.put(msg.subject.id(), msg); + if (hostService.getHost(msg.subject.id()) != null) { + checkPendingArps(msg.subject.id()); + } + // FIXME: figure out pruning so stuff does not build up + } + + // Checks for pending ARP response message for the specified host. + // If one exists, emit response via delegate. + private void checkPendingArps(HostId id) { + ArpResponseMessage msg = pendingMessages.remove(id); + if (msg != null && delegate != null) { + log.info("Emitting ARP response from {} to {}", id, msg.outPort); + delegate.emitResponse(msg.outPort, ByteBuffer.wrap(msg.packet)); + } + } + + // Message carrying an ARP response. + private static class ArpResponseMessage { + private ConnectPoint outPort; + private Host subject; + private byte[] packet; + + public ArpResponseMessage(ConnectPoint outPort, Host subject, byte[] packet) { + this.outPort = outPort; + this.subject = subject; + this.packet = packet; + } + + private ArpResponseMessage() { + } + } + + private class InternalHostListener implements HostListener { + @Override + public void event(HostEvent event) { + checkPendingArps(event.subject().id()); + } + } +} |