diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-10-09 18:32:44 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-10-09 18:32:44 -0700 |
commit | 6a07d2d622eaa06953f3353e39c080984076e8de (patch) | |
tree | bfb50a2090fce186c2cc545a400c969bf2ea702b /framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java | |
parent | e6d71622143ff9b2421a1abbe8434b954b5b1099 (diff) |
Updated master to commit id 6ee8aa3e67ce89908a8c93aa9445c6f71a18f986
Change-Id: I94b055ee2f298daf71e2ec794fd0f2495bd8081f
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java')
-rw-r--r-- | framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java | 148 |
1 files changed, 110 insertions, 38 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java index a0bc693c..8e87a07d 100644 --- a/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java +++ b/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java @@ -15,7 +15,8 @@ */ package org.onosproject.net.packet.impl; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -43,6 +44,7 @@ import org.onosproject.net.packet.PacketContext; import org.onosproject.net.packet.PacketEvent; import org.onosproject.net.packet.PacketPriority; import org.onosproject.net.packet.PacketProcessor; +import org.onosproject.net.packet.PacketProcessorEntry; import org.onosproject.net.packet.PacketProvider; import org.onosproject.net.packet.PacketProviderRegistry; import org.onosproject.net.packet.PacketProviderService; @@ -55,8 +57,6 @@ import org.onosproject.net.provider.AbstractProviderService; import org.slf4j.Logger; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -102,18 +102,18 @@ public class PacketManager private final DeviceListener deviceListener = new InternalDeviceListener(); - private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>(); + private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList(); private ApplicationId appId; @Activate public void activate() { eventHandlingExecutor = Executors.newSingleThreadExecutor( - groupedThreads("onos/net/packet", "event-handler")); + groupedThreads("onos/net/packet", "event-handler")); appId = coreService.getAppId(CoreService.CORE_APP_NAME); store.setDelegate(delegate); deviceService.addListener(deviceListener); - // TODO: Should we request packets for all existing devices? I believe we should. + store.existingRequests().forEach(this::pushToAllDevices); log.info("Started"); } @@ -129,19 +129,35 @@ public class PacketManager public void addProcessor(PacketProcessor processor, int priority) { checkPermission(PACKET_EVENT); checkNotNull(processor, "Processor cannot be null"); - processors.put(priority, processor); + ProcessorEntry entry = new ProcessorEntry(processor, priority); + + // Insert the new processor according to its priority. + int i = 0; + for (; i < processors.size(); i++) { + if (priority < processors.get(i).priority()) { + break; + } + } + processors.add(i, entry); } @Override public void removeProcessor(PacketProcessor processor) { checkPermission(PACKET_EVENT); checkNotNull(processor, "Processor cannot be null"); - processors.values().remove(processor); + + // Remove the processor entry. + for (int i = 0; i < processors.size(); i++) { + if (processors.get(i).processor() == processor) { + processors.remove(i); + break; + } + } } @Override - public Map<Integer, PacketProcessor> getProcessors() { - return ImmutableMap.copyOf(processors); + public List<PacketProcessorEntry> getProcessors() { + return ImmutableList.copyOf(processors); } @Override @@ -152,9 +168,7 @@ public class PacketManager checkNotNull(appId, "Application ID cannot be null"); PacketRequest request = new DefaultPacketRequest(selector, priority, appId); - if (store.requestPackets(request)) { - pushToAllDevices(request); - } + store.requestPackets(request); } @Override @@ -165,9 +179,7 @@ public class PacketManager checkNotNull(appId, "Application ID cannot be null"); PacketRequest request = new DefaultPacketRequest(selector, priority, appId); - if (store.cancelPackets(request)) { - removeFromAllDevices(request); - } + store.cancelPackets(request); } @Override @@ -176,6 +188,18 @@ public class PacketManager } /** + * Pushes all rules to the specified device. + * + * @param device device on which to install packet request flows + */ + private void pushRulesToDevice(Device device) { + log.debug("Pushing packet requests to device {}", device.id()); + for (PacketRequest request : store.existingRequests()) { + pushRule(device, request); + } + } + + /** * Pushes a packet request flow rule to all devices. * * @param request the packet request @@ -187,16 +211,13 @@ public class PacketManager } } - /** * Removes packet request flow rule from all devices. * * @param request the packet request */ private void removeFromAllDevices(PacketRequest request) { - for (Device device : deviceService.getDevices()) { - removeRule(device, request); - } + deviceService.getAvailableDevices().forEach(d -> removeRule(d, request)); } /** @@ -232,7 +253,6 @@ public class PacketManager if (!device.type().equals(Device.Type.SWITCH)) { return; } - ForwardingObjective forwarding = createBuilder(request) .remove(new ObjectiveContext() { @Override @@ -241,7 +261,6 @@ public class PacketManager request, device.id(), error); } }); - objectiveService.forward(device.id(), forwarding); } @@ -263,12 +282,10 @@ public class PacketManager } private void localEmit(OutboundPacket packet) { - final Device device = deviceService.getDevice(packet.sendThrough()); - + Device device = deviceService.getDevice(packet.sendThrough()); if (device == null) { return; } - PacketProvider packetProvider = getProvider(device.providerId()); if (packetProvider != null) { packetProvider.emit(packet); @@ -280,7 +297,9 @@ public class PacketManager return new InternalPacketProviderService(provider); } - // Personalized packet provider service issued to the supplied provider. + /** + * Personalized packet provider service issued to the supplied provider. + */ private class InternalPacketProviderService extends AbstractProviderService<PacketProvider> implements PacketProviderService { @@ -292,8 +311,10 @@ public class PacketManager @Override public void processPacket(PacketContext context) { // TODO filter packets sent to processors based on registrations - for (PacketProcessor processor : processors.values()) { - processor.process(context); + for (ProcessorEntry entry : processors) { + long start = System.nanoTime(); + entry.processor().process(context); + entry.addNanos(System.nanoTime() - start); } } @@ -307,6 +328,16 @@ public class PacketManager public void notify(PacketEvent event) { localEmit(event.subject()); } + + @Override + public void requestPackets(PacketRequest request) { + pushToAllDevices(request); + } + + @Override + public void cancelPackets(PacketRequest request) { + removeFromAllDevices(request); + } } /** @@ -319,17 +350,14 @@ public class PacketManager try { Device device = event.subject(); switch (event.type()) { - case DEVICE_ADDED: - case DEVICE_AVAILABILITY_CHANGED: - if (deviceService.isAvailable(event.subject().id())) { - log.debug("Pushing packet requests to device {}", event.subject().id()); - for (PacketRequest request : store.existingRequests()) { - pushRule(device, request); + case DEVICE_ADDED: + case DEVICE_AVAILABILITY_CHANGED: + if (deviceService.isAvailable(event.subject().id())) { + pushRulesToDevice(device); } - } - break; - default: - break; + break; + default: + break; } } catch (Exception e) { log.warn("Failed to process {}", event, e); @@ -338,4 +366,48 @@ public class PacketManager } } + /** + * Entity for tracking stats for a packet processor. + */ + private class ProcessorEntry implements PacketProcessorEntry { + private final PacketProcessor processor; + private final int priority; + private long invocations = 0; + private long nanos = 0; + + public ProcessorEntry(PacketProcessor processor, int priority) { + this.processor = processor; + this.priority = priority; + } + + @Override + public PacketProcessor processor() { + return processor; + } + + @Override + public int priority() { + return priority; + } + + @Override + public long invocations() { + return invocations; + } + + @Override + public long totalNanos() { + return nanos; + } + + @Override + public long averageNanos() { + return invocations > 0 ? nanos / invocations : 0; + } + + void addNanos(long nanos) { + this.nanos += nanos; + this.invocations++; + } + } } |