aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java')
-rw-r--r--framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java453
1 files changed, 453 insertions, 0 deletions
diff --git a/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
new file mode 100644
index 00000000..de079e03
--- /dev/null
+++ b/framework/src/onos/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -0,0 +1,453 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.provider.of.flow.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleExtPayLoad;
+import org.onosproject.net.flow.FlowRuleProvider;
+import org.onosproject.net.flow.FlowRuleProviderRegistry;
+import org.onosproject.net.flow.FlowRuleProviderService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.statistic.DefaultLoad;
+import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowController;
+import org.onosproject.openflow.controller.OpenFlowEventListener;
+import org.onosproject.openflow.controller.OpenFlowSwitch;
+import org.onosproject.openflow.controller.OpenFlowSwitchListener;
+import org.onosproject.openflow.controller.RoleState;
+import org.onosproject.openflow.controller.ThirdPartyMessage;
+import org.osgi.service.component.ComponentContext;
+import org.projectfloodlight.openflow.protocol.OFBadRequestCode;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFErrorType;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
+import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFPortStatus;
+import org.projectfloodlight.openflow.protocol.OFStatsReply;
+import org.projectfloodlight.openflow.protocol.OFStatsType;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provider which uses an OpenFlow controller to detect network end-station
+ * hosts.
+ */
+@Component(immediate = true)
+public class OpenFlowRuleProvider extends AbstractProvider
+ implements FlowRuleProvider {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleProviderRegistry providerRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenFlowController controller;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
+ private static final int DEFAULT_POLL_FREQUENCY = 10;
+ @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
+ label = "Frequency (in seconds) for polling flow statistics")
+ private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
+
+ private FlowRuleProviderService providerService;
+
+ private final InternalFlowProvider listener = new InternalFlowProvider();
+
+ private Cache<Long, InternalCacheEntry> pendingBatches;
+
+ private final Timer timer = new Timer("onos-openflow-collector");
+ private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+
+ /**
+ * Creates an OpenFlow host provider.
+ */
+ public OpenFlowRuleProvider() {
+ super(new ProviderId("of", "org.onosproject.provider.openflow"));
+ }
+
+ @Activate
+ public void activate(ComponentContext context) {
+ cfgService.registerProperties(getClass());
+ providerService = providerRegistry.register(this);
+ controller.addListener(listener);
+ controller.addEventListener(listener);
+
+ pendingBatches = createBatchCache();
+ createCollectors();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ cfgService.unregisterProperties(getClass(), false);
+ stopCollectors();
+ providerRegistry.unregister(this);
+ providerService = null;
+
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+ int newFlowPollFrequency;
+ try {
+ String s = get(properties, "flowPollFrequency");
+ newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException | ClassCastException e) {
+ newFlowPollFrequency = flowPollFrequency;
+ }
+
+ if (newFlowPollFrequency != flowPollFrequency) {
+ flowPollFrequency = newFlowPollFrequency;
+ adjustRate();
+ }
+
+ log.info("Settings: flowPollFrequency={}", flowPollFrequency);
+ }
+
+ private Cache<Long, InternalCacheEntry> createBatchCache() {
+ return CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ providerService.batchOperationCompleted(notification.getKey(),
+ notification.getValue().failedCompletion());
+ }
+ }).build();
+ }
+
+ private void createCollectors() {
+ controller.getSwitches().forEach(this::createCollector);
+ }
+
+ private void createCollector(OpenFlowSwitch sw) {
+ FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+ fsc.start();
+ collectors.put(new Dpid(sw.getId()), fsc);
+ }
+
+ private void stopCollectors() {
+ collectors.values().forEach(FlowStatsCollector::stop);
+ collectors.clear();
+ }
+
+ private void adjustRate() {
+ DefaultLoad.setPollInterval(flowPollFrequency);
+ collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+ }
+
+ @Override
+ public void applyFlowRule(FlowRule... flowRules) {
+ for (FlowRule flowRule : flowRules) {
+ applyRule(flowRule);
+ }
+ }
+
+ private void applyRule(FlowRule flowRule) {
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
+ .uri()));
+ FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
+ if (hasPayload(flowRuleExtPayLoad)) {
+ OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
+ sw.sendMsg(msg);
+ return;
+ }
+ sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
+ Optional.empty()).buildFlowAdd());
+ }
+
+ @Override
+ public void removeFlowRule(FlowRule... flowRules) {
+ for (FlowRule flowRule : flowRules) {
+ removeRule(flowRule);
+ }
+ }
+
+ private void removeRule(FlowRule flowRule) {
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
+ .uri()));
+ FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
+ if (hasPayload(flowRuleExtPayLoad)) {
+ OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
+ sw.sendMsg(msg);
+ return;
+ }
+ sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
+ Optional.empty()).buildFlowDel());
+ }
+
+ @Override
+ public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
+ // TODO: optimize using the ApplicationId
+ removeFlowRule(flowRules);
+ }
+
+ @Override
+ public void executeBatch(FlowRuleBatchOperation batch) {
+
+ pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
+
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
+ .uri()));
+ OFFlowMod mod;
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ // flow is the third party privacy flow
+
+ FlowRuleExtPayLoad flowRuleExtPayLoad = fbe.target().payLoad();
+ if (hasPayload(flowRuleExtPayLoad)) {
+ OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
+ sw.sendMsg(msg);
+ continue;
+ }
+ FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
+ .factory(), Optional.of(batch.id()));
+ switch (fbe.operator()) {
+ case ADD:
+ mod = builder.buildFlowAdd();
+ break;
+ case REMOVE:
+ mod = builder.buildFlowDel();
+ break;
+ case MODIFY:
+ mod = builder.buildFlowMod();
+ break;
+ default:
+ log.error("Unsupported batch operation {}; skipping flowmod {}",
+ fbe.operator(), fbe);
+ continue;
+ }
+ sw.sendMsg(mod);
+ }
+ OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
+ .setXid(batch.id());
+ sw.sendMsg(builder.build());
+ }
+
+ private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
+ return flowRuleExtPayLoad != null &&
+ flowRuleExtPayLoad.payLoad() != null &&
+ flowRuleExtPayLoad.payLoad().length > 0;
+ }
+
+ private class InternalFlowProvider
+ implements OpenFlowSwitchListener, OpenFlowEventListener {
+
+ @Override
+ public void switchAdded(Dpid dpid) {
+ createCollector(controller.getSwitch(dpid));
+ }
+
+ @Override
+ public void switchRemoved(Dpid dpid) {
+ FlowStatsCollector collector = collectors.remove(dpid);
+ if (collector != null) {
+ collector.stop();
+ }
+ }
+
+ @Override
+ public void switchChanged(Dpid dpid) {
+ }
+
+ @Override
+ public void portChanged(Dpid dpid, OFPortStatus status) {
+ // TODO: Decide whether to evict flows internal store.
+ }
+
+ @Override
+ public void handleMessage(Dpid dpid, OFMessage msg) {
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ switch (msg.getType()) {
+ case FLOW_REMOVED:
+ OFFlowRemoved removed = (OFFlowRemoved) msg;
+
+ FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
+ providerService.flowRemoved(fr);
+ break;
+ case STATS_REPLY:
+ if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+ pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
+ }
+ break;
+ case BARRIER_REPLY:
+ try {
+ InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+ if (entry != null) {
+ providerService
+ .batchOperationCompleted(msg.getXid(),
+ entry.completed());
+ } else {
+ log.warn("Received unknown Barrier Reply: {}",
+ msg.getXid());
+ }
+ } finally {
+ pendingBatches.invalidate(msg.getXid());
+ }
+ break;
+ case ERROR:
+ // TODO: This needs to get suppressed in a better way.
+ if (msg instanceof OFBadRequestErrorMsg &&
+ ((OFBadRequestErrorMsg) msg).getCode() == OFBadRequestCode.BAD_TYPE) {
+ log.debug("Received error message {} from {}", msg, dpid);
+ } else {
+ log.warn("Received error message {} from {}", msg, dpid);
+ }
+
+ OFErrorMsg error = (OFErrorMsg) msg;
+ if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+ OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+ if (fmFailed.getData().getParsedMessage().isPresent()) {
+ OFMessage m = fmFailed.getData().getParsedMessage().get();
+ OFFlowMod fm = (OFFlowMod) m;
+ InternalCacheEntry entry =
+ pendingBatches.getIfPresent(msg.getXid());
+ if (entry != null) {
+ entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
+ } else {
+ log.error("No matching batch for this error: {}", error);
+ }
+ } else {
+ // FIXME: Potentially add flowtracking to avoid this message.
+ log.error("Flow installation failed but switch didn't"
+ + " tell us which one.");
+ }
+ }
+ break;
+ default:
+ log.debug("Unhandled message type: {}", msg.getType());
+ }
+
+ }
+
+ @Override
+ public void receivedRoleReply(Dpid dpid, RoleState requested,
+ RoleState response) {
+ // Do nothing here for now.
+ }
+
+ private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) {
+
+ DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+
+ List<FlowEntry> flowEntries = replies.getEntries().stream()
+ .map(entry -> new FlowEntryBuilder(dpid, entry).build())
+ .collect(Collectors.toList());
+
+ providerService.pushFlowMetrics(did, flowEntries);
+ }
+ }
+
+ /**
+ * The internal cache entry holding the original request as well as
+ * accumulating the any failures along the way.
+ * <p/>
+ * If this entry is evicted from the cache then the entire operation is
+ * considered failed. Otherwise, only the failures reported by the device
+ * will be propagated up.
+ */
+ private class InternalCacheEntry {
+
+ private final FlowRuleBatchOperation operation;
+ private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
+
+ public InternalCacheEntry(FlowRuleBatchOperation operation) {
+ this.operation = operation;
+ }
+
+ /**
+ * Appends a failed rule to the set of failed items.
+ *
+ * @param rule the failed rule
+ */
+ public void appendFailure(FlowRule rule) {
+ failures.add(rule);
+ }
+
+ /**
+ * Fails the entire batch and returns the failed operation.
+ *
+ * @return the failed operation
+ */
+ public CompletedBatchOperation failedCompletion() {
+ Set<FlowRule> fails = operation.getOperations().stream()
+ .map(op -> op.target()).collect(Collectors.toSet());
+ return new CompletedBatchOperation(false,
+ Collections
+ .unmodifiableSet(fails),
+ operation.deviceId());
+ }
+
+ /**
+ * Returns the completed operation and whether the batch suceeded.
+ *
+ * @return the completed operation
+ */
+ public CompletedBatchOperation completed() {
+ return new CompletedBatchOperation(
+ failures.isEmpty(),
+ Collections
+ .unmodifiableSet(failures),
+ operation.deviceId());
+ }
+ }
+
+}