aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java')
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java148
1 files changed, 110 insertions, 38 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index a0bc693c..8e87a07d 100644
--- a/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/framework/src/onos/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -15,7 +15,8 @@
*/
package org.onosproject.net.packet.impl;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,6 +44,7 @@ import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketProcessorEntry;
import org.onosproject.net.packet.PacketProvider;
import org.onosproject.net.packet.PacketProviderRegistry;
import org.onosproject.net.packet.PacketProviderService;
@@ -55,8 +57,6 @@ import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -102,18 +102,18 @@ public class PacketManager
private final DeviceListener deviceListener = new InternalDeviceListener();
- private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
+ private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList();
private ApplicationId appId;
@Activate
public void activate() {
eventHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/net/packet", "event-handler"));
+ groupedThreads("onos/net/packet", "event-handler"));
appId = coreService.getAppId(CoreService.CORE_APP_NAME);
store.setDelegate(delegate);
deviceService.addListener(deviceListener);
- // TODO: Should we request packets for all existing devices? I believe we should.
+ store.existingRequests().forEach(this::pushToAllDevices);
log.info("Started");
}
@@ -129,19 +129,35 @@ public class PacketManager
public void addProcessor(PacketProcessor processor, int priority) {
checkPermission(PACKET_EVENT);
checkNotNull(processor, "Processor cannot be null");
- processors.put(priority, processor);
+ ProcessorEntry entry = new ProcessorEntry(processor, priority);
+
+ // Insert the new processor according to its priority.
+ int i = 0;
+ for (; i < processors.size(); i++) {
+ if (priority < processors.get(i).priority()) {
+ break;
+ }
+ }
+ processors.add(i, entry);
}
@Override
public void removeProcessor(PacketProcessor processor) {
checkPermission(PACKET_EVENT);
checkNotNull(processor, "Processor cannot be null");
- processors.values().remove(processor);
+
+ // Remove the processor entry.
+ for (int i = 0; i < processors.size(); i++) {
+ if (processors.get(i).processor() == processor) {
+ processors.remove(i);
+ break;
+ }
+ }
}
@Override
- public Map<Integer, PacketProcessor> getProcessors() {
- return ImmutableMap.copyOf(processors);
+ public List<PacketProcessorEntry> getProcessors() {
+ return ImmutableList.copyOf(processors);
}
@Override
@@ -152,9 +168,7 @@ public class PacketManager
checkNotNull(appId, "Application ID cannot be null");
PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
- if (store.requestPackets(request)) {
- pushToAllDevices(request);
- }
+ store.requestPackets(request);
}
@Override
@@ -165,9 +179,7 @@ public class PacketManager
checkNotNull(appId, "Application ID cannot be null");
PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
- if (store.cancelPackets(request)) {
- removeFromAllDevices(request);
- }
+ store.cancelPackets(request);
}
@Override
@@ -176,6 +188,18 @@ public class PacketManager
}
/**
+ * Pushes all rules to the specified device.
+ *
+ * @param device device on which to install packet request flows
+ */
+ private void pushRulesToDevice(Device device) {
+ log.debug("Pushing packet requests to device {}", device.id());
+ for (PacketRequest request : store.existingRequests()) {
+ pushRule(device, request);
+ }
+ }
+
+ /**
* Pushes a packet request flow rule to all devices.
*
* @param request the packet request
@@ -187,16 +211,13 @@ public class PacketManager
}
}
-
/**
* Removes packet request flow rule from all devices.
*
* @param request the packet request
*/
private void removeFromAllDevices(PacketRequest request) {
- for (Device device : deviceService.getDevices()) {
- removeRule(device, request);
- }
+ deviceService.getAvailableDevices().forEach(d -> removeRule(d, request));
}
/**
@@ -232,7 +253,6 @@ public class PacketManager
if (!device.type().equals(Device.Type.SWITCH)) {
return;
}
-
ForwardingObjective forwarding = createBuilder(request)
.remove(new ObjectiveContext() {
@Override
@@ -241,7 +261,6 @@ public class PacketManager
request, device.id(), error);
}
});
-
objectiveService.forward(device.id(), forwarding);
}
@@ -263,12 +282,10 @@ public class PacketManager
}
private void localEmit(OutboundPacket packet) {
- final Device device = deviceService.getDevice(packet.sendThrough());
-
+ Device device = deviceService.getDevice(packet.sendThrough());
if (device == null) {
return;
}
-
PacketProvider packetProvider = getProvider(device.providerId());
if (packetProvider != null) {
packetProvider.emit(packet);
@@ -280,7 +297,9 @@ public class PacketManager
return new InternalPacketProviderService(provider);
}
- // Personalized packet provider service issued to the supplied provider.
+ /**
+ * Personalized packet provider service issued to the supplied provider.
+ */
private class InternalPacketProviderService
extends AbstractProviderService<PacketProvider>
implements PacketProviderService {
@@ -292,8 +311,10 @@ public class PacketManager
@Override
public void processPacket(PacketContext context) {
// TODO filter packets sent to processors based on registrations
- for (PacketProcessor processor : processors.values()) {
- processor.process(context);
+ for (ProcessorEntry entry : processors) {
+ long start = System.nanoTime();
+ entry.processor().process(context);
+ entry.addNanos(System.nanoTime() - start);
}
}
@@ -307,6 +328,16 @@ public class PacketManager
public void notify(PacketEvent event) {
localEmit(event.subject());
}
+
+ @Override
+ public void requestPackets(PacketRequest request) {
+ pushToAllDevices(request);
+ }
+
+ @Override
+ public void cancelPackets(PacketRequest request) {
+ removeFromAllDevices(request);
+ }
}
/**
@@ -319,17 +350,14 @@ public class PacketManager
try {
Device device = event.subject();
switch (event.type()) {
- case DEVICE_ADDED:
- case DEVICE_AVAILABILITY_CHANGED:
- if (deviceService.isAvailable(event.subject().id())) {
- log.debug("Pushing packet requests to device {}", event.subject().id());
- for (PacketRequest request : store.existingRequests()) {
- pushRule(device, request);
+ case DEVICE_ADDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (deviceService.isAvailable(event.subject().id())) {
+ pushRulesToDevice(device);
}
- }
- break;
- default:
- break;
+ break;
+ default:
+ break;
}
} catch (Exception e) {
log.warn("Failed to process {}", event, e);
@@ -338,4 +366,48 @@ public class PacketManager
}
}
+ /**
+ * Entity for tracking stats for a packet processor.
+ */
+ private class ProcessorEntry implements PacketProcessorEntry {
+ private final PacketProcessor processor;
+ private final int priority;
+ private long invocations = 0;
+ private long nanos = 0;
+
+ public ProcessorEntry(PacketProcessor processor, int priority) {
+ this.processor = processor;
+ this.priority = priority;
+ }
+
+ @Override
+ public PacketProcessor processor() {
+ return processor;
+ }
+
+ @Override
+ public int priority() {
+ return priority;
+ }
+
+ @Override
+ public long invocations() {
+ return invocations;
+ }
+
+ @Override
+ public long totalNanos() {
+ return nanos;
+ }
+
+ @Override
+ public long averageNanos() {
+ return invocations > 0 ? nanos / invocations : 0;
+ }
+
+ void addNanos(long nanos) {
+ this.nanos += nanos;
+ this.invocations++;
+ }
+ }
}