diff options
Diffstat (limited to 'framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java')
-rw-r--r-- | framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java | 218 |
1 files changed, 193 insertions, 25 deletions
diff --git a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java index de079e03..6374ca55 100644 --- a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java +++ b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 Open Networking Laboratory + * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import com.google.common.cache.RemovalCause; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -32,6 +33,7 @@ import org.onosproject.cfg.ComponentConfigService; import org.onosproject.core.ApplicationId; import org.onosproject.net.DeviceId; import org.onosproject.net.flow.CompletedBatchOperation; +import org.onosproject.net.flow.DefaultTableStatisticsEntry; import org.onosproject.net.flow.FlowEntry; import org.onosproject.net.flow.FlowRule; import org.onosproject.net.flow.FlowRuleBatchEntry; @@ -40,6 +42,7 @@ import org.onosproject.net.flow.FlowRuleExtPayLoad; import org.onosproject.net.flow.FlowRuleProvider; import org.onosproject.net.flow.FlowRuleProviderRegistry; import org.onosproject.net.flow.FlowRuleProviderService; +import org.onosproject.net.flow.TableStatisticsEntry; import org.onosproject.net.provider.AbstractProvider; import org.onosproject.net.provider.ProviderId; import org.onosproject.net.statistic.DefaultLoad; @@ -58,6 +61,8 @@ import org.projectfloodlight.openflow.protocol.OFErrorType; import org.projectfloodlight.openflow.protocol.OFFlowMod; import org.projectfloodlight.openflow.protocol.OFFlowRemoved; import org.projectfloodlight.openflow.protocol.OFFlowStatsReply; +import org.projectfloodlight.openflow.protocol.OFTableStatsReply; +import org.projectfloodlight.openflow.protocol.OFTableStatsEntry; import org.projectfloodlight.openflow.protocol.OFMessage; import org.projectfloodlight.openflow.protocol.OFPortStatus; import org.projectfloodlight.openflow.protocol.OFStatsReply; @@ -70,12 +75,14 @@ import java.util.Collections; import java.util.Dictionary; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.Timer; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Strings.isNullOrEmpty; import static org.onlab.util.Tools.get; import static org.slf4j.LoggerFactory.getLogger; @@ -99,11 +106,16 @@ public class OpenFlowRuleProvider extends AbstractProvider @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ComponentConfigService cfgService; - private static final int DEFAULT_POLL_FREQUENCY = 10; + private static final int DEFAULT_POLL_FREQUENCY = 5; @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY, label = "Frequency (in seconds) for polling flow statistics") private int flowPollFrequency = DEFAULT_POLL_FREQUENCY; + private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true; + @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING, + label = "Adaptive Flow Sampling is on or off") + private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING; + private FlowRuleProviderService providerService; private final InternalFlowProvider listener = new InternalFlowProvider(); @@ -111,7 +123,12 @@ public class OpenFlowRuleProvider extends AbstractProvider private Cache<Long, InternalCacheEntry> pendingBatches; private final Timer timer = new Timer("onos-openflow-collector"); + private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap(); + + // NewAdaptiveFlowStatsCollector Set + private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap(); private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap(); + private final Map<Dpid, TableStatisticsCollector> tableStatsCollectors = Maps.newHashMap(); /** * Creates an OpenFlow host provider. @@ -128,9 +145,11 @@ public class OpenFlowRuleProvider extends AbstractProvider controller.addEventListener(listener); pendingBatches = createBatchCache(); + createCollectors(); - log.info("Started"); + log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}", + flowPollFrequency, adaptiveFlowSampling); } @Deactivate @@ -161,6 +180,20 @@ public class OpenFlowRuleProvider extends AbstractProvider } log.info("Settings: flowPollFrequency={}", flowPollFrequency); + + boolean newAdaptiveFlowSampling; + String s = get(properties, "adaptiveFlowSampling"); + newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim()); + + if (newAdaptiveFlowSampling != adaptiveFlowSampling) { + // stop previous collector + stopCollectors(); + adaptiveFlowSampling = newAdaptiveFlowSampling; + // create new collectors + createCollectors(); + } + + log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling); } private Cache<Long, InternalCacheEntry> createBatchCache() { @@ -179,19 +212,43 @@ public class OpenFlowRuleProvider extends AbstractProvider } private void createCollector(OpenFlowSwitch sw) { - FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency); - fsc.start(); - collectors.put(new Dpid(sw.getId()), fsc); + if (adaptiveFlowSampling) { + // NewAdaptiveFlowStatsCollector Constructor + NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency); + fsc.start(); + afsCollectors.put(new Dpid(sw.getId()), fsc); + } else { + FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency); + fsc.start(); + simpleCollectors.put(new Dpid(sw.getId()), fsc); + } + TableStatisticsCollector tsc = new TableStatisticsCollector(timer, sw, flowPollFrequency); + tsc.start(); + tableStatsCollectors.put(new Dpid(sw.getId()), tsc); } private void stopCollectors() { - collectors.values().forEach(FlowStatsCollector::stop); - collectors.clear(); + if (adaptiveFlowSampling) { + // NewAdaptiveFlowStatsCollector Destructor + afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop); + afsCollectors.clear(); + } else { + simpleCollectors.values().forEach(FlowStatsCollector::stop); + simpleCollectors.clear(); + } + tableStatsCollectors.values().forEach(TableStatisticsCollector::stop); + tableStatsCollectors.clear(); } private void adjustRate() { DefaultLoad.setPollInterval(flowPollFrequency); - collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency)); + if (adaptiveFlowSampling) { + // NewAdaptiveFlowStatsCollector calAndPollInterval + afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency)); + } else { + simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency)); + } + tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency)); } @Override @@ -202,8 +259,9 @@ public class OpenFlowRuleProvider extends AbstractProvider } private void applyRule(FlowRule flowRule) { - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() - .uri())); + Dpid dpid = Dpid.dpid(flowRule.deviceId().uri()); + OpenFlowSwitch sw = controller.getSwitch(dpid); + FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); if (hasPayload(flowRuleExtPayLoad)) { OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); @@ -212,6 +270,14 @@ public class OpenFlowRuleProvider extends AbstractProvider } sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(), Optional.empty()).buildFlowAdd()); + + if (adaptiveFlowSampling) { + // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid); + if (collector != null) { + collector.addWithFlowRule(flowRule); + } + } } @Override @@ -222,8 +288,9 @@ public class OpenFlowRuleProvider extends AbstractProvider } private void removeRule(FlowRule flowRule) { - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() - .uri())); + Dpid dpid = Dpid.dpid(flowRule.deviceId().uri()); + OpenFlowSwitch sw = controller.getSwitch(dpid); + FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); if (hasPayload(flowRuleExtPayLoad)) { OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); @@ -232,6 +299,14 @@ public class OpenFlowRuleProvider extends AbstractProvider } sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(), Optional.empty()).buildFlowDel()); + + if (adaptiveFlowSampling) { + // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid); + if (collector != null) { + collector.removeFlows(flowRule); + } + } } @Override @@ -242,11 +317,12 @@ public class OpenFlowRuleProvider extends AbstractProvider @Override public void executeBatch(FlowRuleBatchOperation batch) { + checkNotNull(batch); pendingBatches.put(batch.id(), new InternalCacheEntry(batch)); - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId() - .uri())); + Dpid dpid = Dpid.dpid(batch.deviceId().uri()); + OpenFlowSwitch sw = controller.getSwitch(dpid); OFFlowMod mod; for (FlowRuleBatchEntry fbe : batch.getOperations()) { // flow is the third party privacy flow @@ -257,21 +333,35 @@ public class OpenFlowRuleProvider extends AbstractProvider sw.sendMsg(msg); continue; } - FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw - .factory(), Optional.of(batch.id())); + FlowModBuilder builder = + FlowModBuilder.builder(fbe.target(), sw.factory(), Optional.of(batch.id())); + NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid); switch (fbe.operator()) { case ADD: mod = builder.buildFlowAdd(); + if (adaptiveFlowSampling && collector != null) { + // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + collector.addWithFlowRule(fbe.target()); + } break; case REMOVE: mod = builder.buildFlowDel(); + if (adaptiveFlowSampling && collector != null) { + // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + collector.removeFlows(fbe.target()); + } break; case MODIFY: mod = builder.buildFlowMod(); + if (adaptiveFlowSampling && collector != null) { + // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not + collector.addOrUpdateFlows((FlowEntry) fbe.target()); + } break; default: log.error("Unsupported batch operation {}; skipping flowmod {}", - fbe.operator(), fbe); + fbe.operator(), fbe); continue; } sw.sendMsg(mod); @@ -292,14 +382,28 @@ public class OpenFlowRuleProvider extends AbstractProvider @Override public void switchAdded(Dpid dpid) { + + OpenFlowSwitch sw = controller.getSwitch(dpid); + createCollector(controller.getSwitch(dpid)); } @Override public void switchRemoved(Dpid dpid) { - FlowStatsCollector collector = collectors.remove(dpid); - if (collector != null) { - collector.stop(); + if (adaptiveFlowSampling) { + NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid); + if (collector != null) { + collector.stop(); + } + } else { + FlowStatsCollector collector = simpleCollectors.remove(dpid); + if (collector != null) { + collector.stop(); + } + } + TableStatisticsCollector tsc = tableStatsCollectors.remove(dpid); + if (tsc != null) { + tsc.stop(); } } @@ -321,10 +425,20 @@ public class OpenFlowRuleProvider extends AbstractProvider FlowEntry fr = new FlowEntryBuilder(dpid, removed).build(); providerService.flowRemoved(fr); + + if (adaptiveFlowSampling) { + // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid); + if (collector != null) { + collector.flowRemoved(fr); + } + } break; case STATS_REPLY: if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) { pushFlowMetrics(dpid, (OFFlowStatsReply) msg); + } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) { + pushTableStatistics(dpid, (OFTableStatsReply) msg); } break; case BARRIER_REPLY: @@ -370,11 +484,10 @@ public class OpenFlowRuleProvider extends AbstractProvider + " tell us which one."); } } - break; + default: log.debug("Unhandled message type: {}", msg.getType()); } - } @Override @@ -386,13 +499,68 @@ public class OpenFlowRuleProvider extends AbstractProvider private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) { DeviceId did = DeviceId.deviceId(Dpid.uri(dpid)); - OpenFlowSwitch sw = controller.getSwitch(dpid); List<FlowEntry> flowEntries = replies.getEntries().stream() .map(entry -> new FlowEntryBuilder(dpid, entry).build()) .collect(Collectors.toList()); - providerService.pushFlowMetrics(did, flowEntries); + if (adaptiveFlowSampling) { + NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid); + + synchronized (afsc) { + if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) { + log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, " + + "OFFlowStatsReply Xid={}, for {}", + afsc.getFlowMissingXid(), replies.getXid(), dpid); + } + + // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest? + if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) { + if (afsc.getFlowMissingXid() == replies.getXid()) { + // call entire flow stats update with flowMissing synchronization. + // used existing pushFlowMetrics + providerService.pushFlowMetrics(did, flowEntries); + } + // reset flowMissingXid to NO_FLOW_MISSING_XID + afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID); + + } else { + // call individual flow stats update + providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries); + } + + // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsc.pushFlowMetrics(flowEntries); + } + } else { + // call existing entire flow stats update with flowMissing synchronization + providerService.pushFlowMetrics(did, flowEntries); + } + } + + private void pushTableStatistics(Dpid dpid, OFTableStatsReply replies) { + + DeviceId did = DeviceId.deviceId(Dpid.uri(dpid)); + List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream() + .map(entry -> buildTableStatistics(did, entry)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + providerService.pushTableStatistics(did, tableStatsEntries); + } + + private TableStatisticsEntry buildTableStatistics(DeviceId deviceId, + OFTableStatsEntry ofEntry) { + TableStatisticsEntry entry = null; + if (ofEntry != null) { + entry = new DefaultTableStatisticsEntry(deviceId, + ofEntry.getTableId().getValue(), + ofEntry.getActiveCount(), + ofEntry.getLookupCount().getValue(), + ofEntry.getMatchedCount().getValue()); + } + + return entry; + } } |