aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java')
-rw-r--r--framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java218
1 files changed, 193 insertions, 25 deletions
diff --git a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index de079e03..6374ca55 100644
--- a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 Open Networking Laboratory
+ * Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -32,6 +33,7 @@ import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.DefaultTableStatisticsEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
@@ -40,6 +42,7 @@ import org.onosproject.net.flow.FlowRuleExtPayLoad;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
+import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.statistic.DefaultLoad;
@@ -58,6 +61,8 @@ import org.projectfloodlight.openflow.protocol.OFErrorType;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
+import org.projectfloodlight.openflow.protocol.OFTableStatsReply;
+import org.projectfloodlight.openflow.protocol.OFTableStatsEntry;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.projectfloodlight.openflow.protocol.OFStatsReply;
@@ -70,12 +75,14 @@ import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.slf4j.LoggerFactory.getLogger;
@@ -99,11 +106,16 @@ public class OpenFlowRuleProvider extends AbstractProvider
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
- private static final int DEFAULT_POLL_FREQUENCY = 10;
+ private static final int DEFAULT_POLL_FREQUENCY = 5;
@Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
label = "Frequency (in seconds) for polling flow statistics")
private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
+ private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true;
+ @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING,
+ label = "Adaptive Flow Sampling is on or off")
+ private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
+
private FlowRuleProviderService providerService;
private final InternalFlowProvider listener = new InternalFlowProvider();
@@ -111,7 +123,12 @@ public class OpenFlowRuleProvider extends AbstractProvider
private Cache<Long, InternalCacheEntry> pendingBatches;
private final Timer timer = new Timer("onos-openflow-collector");
+ private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap();
+
+ // NewAdaptiveFlowStatsCollector Set
+ private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap();
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+ private final Map<Dpid, TableStatisticsCollector> tableStatsCollectors = Maps.newHashMap();
/**
* Creates an OpenFlow host provider.
@@ -128,9 +145,11 @@ public class OpenFlowRuleProvider extends AbstractProvider
controller.addEventListener(listener);
pendingBatches = createBatchCache();
+
createCollectors();
- log.info("Started");
+ log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}",
+ flowPollFrequency, adaptiveFlowSampling);
}
@Deactivate
@@ -161,6 +180,20 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
log.info("Settings: flowPollFrequency={}", flowPollFrequency);
+
+ boolean newAdaptiveFlowSampling;
+ String s = get(properties, "adaptiveFlowSampling");
+ newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
+
+ if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
+ // stop previous collector
+ stopCollectors();
+ adaptiveFlowSampling = newAdaptiveFlowSampling;
+ // create new collectors
+ createCollectors();
+ }
+
+ log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
}
private Cache<Long, InternalCacheEntry> createBatchCache() {
@@ -179,19 +212,43 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
private void createCollector(OpenFlowSwitch sw) {
- FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
- fsc.start();
- collectors.put(new Dpid(sw.getId()), fsc);
+ if (adaptiveFlowSampling) {
+ // NewAdaptiveFlowStatsCollector Constructor
+ NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency);
+ fsc.start();
+ afsCollectors.put(new Dpid(sw.getId()), fsc);
+ } else {
+ FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+ fsc.start();
+ simpleCollectors.put(new Dpid(sw.getId()), fsc);
+ }
+ TableStatisticsCollector tsc = new TableStatisticsCollector(timer, sw, flowPollFrequency);
+ tsc.start();
+ tableStatsCollectors.put(new Dpid(sw.getId()), tsc);
}
private void stopCollectors() {
- collectors.values().forEach(FlowStatsCollector::stop);
- collectors.clear();
+ if (adaptiveFlowSampling) {
+ // NewAdaptiveFlowStatsCollector Destructor
+ afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop);
+ afsCollectors.clear();
+ } else {
+ simpleCollectors.values().forEach(FlowStatsCollector::stop);
+ simpleCollectors.clear();
+ }
+ tableStatsCollectors.values().forEach(TableStatisticsCollector::stop);
+ tableStatsCollectors.clear();
}
private void adjustRate() {
DefaultLoad.setPollInterval(flowPollFrequency);
- collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+ if (adaptiveFlowSampling) {
+ // NewAdaptiveFlowStatsCollector calAndPollInterval
+ afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency));
+ } else {
+ simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+ }
+ tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
}
@Override
@@ -202,8 +259,9 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
private void applyRule(FlowRule flowRule) {
- OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
- .uri()));
+ Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+
FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
if (hasPayload(flowRuleExtPayLoad)) {
OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -212,6 +270,14 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty()).buildFlowAdd());
+
+ if (adaptiveFlowSampling) {
+ // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
+ if (collector != null) {
+ collector.addWithFlowRule(flowRule);
+ }
+ }
}
@Override
@@ -222,8 +288,9 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
private void removeRule(FlowRule flowRule) {
- OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
- .uri()));
+ Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+
FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
if (hasPayload(flowRuleExtPayLoad)) {
OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -232,6 +299,14 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty()).buildFlowDel());
+
+ if (adaptiveFlowSampling) {
+ // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
+ if (collector != null) {
+ collector.removeFlows(flowRule);
+ }
+ }
}
@Override
@@ -242,11 +317,12 @@ public class OpenFlowRuleProvider extends AbstractProvider
@Override
public void executeBatch(FlowRuleBatchOperation batch) {
+ checkNotNull(batch);
pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
- OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
- .uri()));
+ Dpid dpid = Dpid.dpid(batch.deviceId().uri());
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
OFFlowMod mod;
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
// flow is the third party privacy flow
@@ -257,21 +333,35 @@ public class OpenFlowRuleProvider extends AbstractProvider
sw.sendMsg(msg);
continue;
}
- FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
- .factory(), Optional.of(batch.id()));
+ FlowModBuilder builder =
+ FlowModBuilder.builder(fbe.target(), sw.factory(), Optional.of(batch.id()));
+ NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
switch (fbe.operator()) {
case ADD:
mod = builder.buildFlowAdd();
+ if (adaptiveFlowSampling && collector != null) {
+ // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ collector.addWithFlowRule(fbe.target());
+ }
break;
case REMOVE:
mod = builder.buildFlowDel();
+ if (adaptiveFlowSampling && collector != null) {
+ // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ collector.removeFlows(fbe.target());
+ }
break;
case MODIFY:
mod = builder.buildFlowMod();
+ if (adaptiveFlowSampling && collector != null) {
+ // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not
+ collector.addOrUpdateFlows((FlowEntry) fbe.target());
+ }
break;
default:
log.error("Unsupported batch operation {}; skipping flowmod {}",
- fbe.operator(), fbe);
+ fbe.operator(), fbe);
continue;
}
sw.sendMsg(mod);
@@ -292,14 +382,28 @@ public class OpenFlowRuleProvider extends AbstractProvider
@Override
public void switchAdded(Dpid dpid) {
+
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+
createCollector(controller.getSwitch(dpid));
}
@Override
public void switchRemoved(Dpid dpid) {
- FlowStatsCollector collector = collectors.remove(dpid);
- if (collector != null) {
- collector.stop();
+ if (adaptiveFlowSampling) {
+ NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid);
+ if (collector != null) {
+ collector.stop();
+ }
+ } else {
+ FlowStatsCollector collector = simpleCollectors.remove(dpid);
+ if (collector != null) {
+ collector.stop();
+ }
+ }
+ TableStatisticsCollector tsc = tableStatsCollectors.remove(dpid);
+ if (tsc != null) {
+ tsc.stop();
}
}
@@ -321,10 +425,20 @@ public class OpenFlowRuleProvider extends AbstractProvider
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
providerService.flowRemoved(fr);
+
+ if (adaptiveFlowSampling) {
+ // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
+ if (collector != null) {
+ collector.flowRemoved(fr);
+ }
+ }
break;
case STATS_REPLY:
if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
+ } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
+ pushTableStatistics(dpid, (OFTableStatsReply) msg);
}
break;
case BARRIER_REPLY:
@@ -370,11 +484,10 @@ public class OpenFlowRuleProvider extends AbstractProvider
+ " tell us which one.");
}
}
- break;
+
default:
log.debug("Unhandled message type: {}", msg.getType());
}
-
}
@Override
@@ -386,13 +499,68 @@ public class OpenFlowRuleProvider extends AbstractProvider
private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) {
DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
- OpenFlowSwitch sw = controller.getSwitch(dpid);
List<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(dpid, entry).build())
.collect(Collectors.toList());
- providerService.pushFlowMetrics(did, flowEntries);
+ if (adaptiveFlowSampling) {
+ NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
+
+ synchronized (afsc) {
+ if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
+ log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, "
+ + "OFFlowStatsReply Xid={}, for {}",
+ afsc.getFlowMissingXid(), replies.getXid(), dpid);
+ }
+
+ // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
+ if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
+ if (afsc.getFlowMissingXid() == replies.getXid()) {
+ // call entire flow stats update with flowMissing synchronization.
+ // used existing pushFlowMetrics
+ providerService.pushFlowMetrics(did, flowEntries);
+ }
+ // reset flowMissingXid to NO_FLOW_MISSING_XID
+ afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID);
+
+ } else {
+ // call individual flow stats update
+ providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
+ }
+
+ // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+ afsc.pushFlowMetrics(flowEntries);
+ }
+ } else {
+ // call existing entire flow stats update with flowMissing synchronization
+ providerService.pushFlowMetrics(did, flowEntries);
+ }
+ }
+
+ private void pushTableStatistics(Dpid dpid, OFTableStatsReply replies) {
+
+ DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
+ List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream()
+ .map(entry -> buildTableStatistics(did, entry))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ providerService.pushTableStatistics(did, tableStatsEntries);
+ }
+
+ private TableStatisticsEntry buildTableStatistics(DeviceId deviceId,
+ OFTableStatsEntry ofEntry) {
+ TableStatisticsEntry entry = null;
+ if (ofEntry != null) {
+ entry = new DefaultTableStatisticsEntry(deviceId,
+ ofEntry.getTableId().getValue(),
+ ofEntry.getActiveCount(),
+ ofEntry.getLookupCount().getValue(),
+ ofEntry.getMatchedCount().getValue());
+ }
+
+ return entry;
+
}
}