aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
committerAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
commit13d05bc8458758ee39cb829098241e89616717ee (patch)
tree22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
parent6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff)
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java')
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java593
1 files changed, 593 insertions, 0 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
new file mode 100644
index 00000000..a1d046c5
--- /dev/null
+++ b/framework/src/onos/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -0,0 +1,593 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.net.provider.AbstractListenerProviderRegistry;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleProvider;
+import org.onosproject.net.flow.FlowRuleProviderRegistry;
+import org.onosproject.net.flow.FlowRuleProviderService;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.FlowRuleStore;
+import org.onosproject.net.flow.FlowRuleStoreDelegate;
+import org.onosproject.net.provider.AbstractProviderService;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_ADD_REQUESTED;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVE_REQUESTED;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.security.AppPermission.Type.*;
+
+
+
+/**
+ * Provides implementation of the flow NB &amp; SB APIs.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class FlowRuleManager
+ extends AbstractListenerProviderRegistry<FlowRuleEvent, FlowRuleListener,
+ FlowRuleProvider, FlowRuleProviderService>
+ implements FlowRuleService, FlowRuleProviderRegistry {
+
+ public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
+ private static final boolean ALLOW_EXTRANEOUS_RULES = false;
+
+ @Property(name = "allowExtraneousRules", boolValue = ALLOW_EXTRANEOUS_RULES,
+ label = "Allow flow rules in switch not installed by ONOS")
+ private boolean allowExtraneousRules = ALLOW_EXTRANEOUS_RULES;
+
+ private final Logger log = getLogger(getClass());
+
+ private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
+
+ protected ExecutorService deviceInstallers =
+ Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d"));
+
+ protected ExecutorService operationsService =
+ Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d"));
+
+ private IdGenerator idGenerator;
+
+ private Map<Long, FlowOperationsProcessor> pendingFlowOperations
+ = new ConcurrentHashMap<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ cfgService.registerProperties(getClass());
+ idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+ modified(context);
+
+ store.setDelegate(delegate);
+ eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ cfgService.unregisterProperties(getClass(), false);
+ deviceInstallers.shutdownNow();
+ operationsService.shutdownNow();
+ store.unsetDelegate(delegate);
+ eventDispatcher.removeSink(FlowRuleEvent.class);
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ if (context == null) {
+ return;
+ }
+
+ Dictionary<?, ?> properties = context.getProperties();
+
+ String s = Tools.get(properties, "allowExtraneousRules");
+ allowExtraneousRules = Strings.isNullOrEmpty(s) ? ALLOW_EXTRANEOUS_RULES : Boolean.valueOf(s);
+
+ if (allowExtraneousRules) {
+ log.info("Allowing flow rules not installed by ONOS");
+ }
+ }
+
+ @Override
+ public int getFlowRuleCount() {
+ checkPermission(FLOWRULE_READ);
+ return store.getFlowRuleCount();
+ }
+
+ @Override
+ public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ checkPermission(FLOWRULE_READ);
+ return store.getFlowEntries(deviceId);
+ }
+
+ @Override
+ public void applyFlowRules(FlowRule... flowRules) {
+ checkPermission(FLOWRULE_WRITE);
+
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ for (int i = 0; i < flowRules.length; i++) {
+ builder.add(flowRules[i]);
+ }
+ apply(builder.build());
+ }
+
+ @Override
+ public void removeFlowRules(FlowRule... flowRules) {
+ checkPermission(FLOWRULE_WRITE);
+
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ for (int i = 0; i < flowRules.length; i++) {
+ builder.remove(flowRules[i]);
+ }
+ apply(builder.build());
+ }
+
+ @Override
+ public void removeFlowRulesById(ApplicationId id) {
+ checkPermission(FLOWRULE_WRITE);
+ removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
+ }
+
+ @Override
+ public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
+ checkPermission(FLOWRULE_READ);
+
+ Set<FlowRule> flowEntries = Sets.newHashSet();
+ for (Device d : deviceService.getDevices()) {
+ for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
+ if (flowEntry.appId() == id.id()) {
+ flowEntries.add(flowEntry);
+ }
+ }
+ }
+ return flowEntries;
+ }
+
+ @Override
+ public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
+ checkPermission(FLOWRULE_READ);
+
+ Set<FlowRule> matches = Sets.newHashSet();
+ long toLookUp = ((long) appId.id() << 16) | groupId;
+ for (Device d : deviceService.getDevices()) {
+ for (FlowEntry flowEntry : store.getFlowEntries(d.id())) {
+ if ((flowEntry.id().value() >>> 32) == toLookUp) {
+ matches.add(flowEntry);
+ }
+ }
+ }
+ return matches;
+ }
+
+ @Override
+ public void apply(FlowRuleOperations ops) {
+ checkPermission(FLOWRULE_WRITE);
+ operationsService.submit(new FlowOperationsProcessor(ops));
+ }
+
+ @Override
+ protected FlowRuleProviderService createProviderService(
+ FlowRuleProvider provider) {
+ return new InternalFlowRuleProviderService(provider);
+ }
+
+ private class InternalFlowRuleProviderService
+ extends AbstractProviderService<FlowRuleProvider>
+ implements FlowRuleProviderService {
+
+ final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
+
+ protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
+ super(provider);
+ }
+
+ @Override
+ public void flowRemoved(FlowEntry flowEntry) {
+ checkNotNull(flowEntry, FLOW_RULE_NULL);
+ checkValidity();
+ lastSeen.remove(flowEntry);
+ FlowEntry stored = store.getFlowEntry(flowEntry);
+ if (stored == null) {
+ log.debug("Rule already evicted from store: {}", flowEntry);
+ return;
+ }
+ Device device = deviceService.getDevice(flowEntry.deviceId());
+ FlowRuleProvider frp = getProvider(device.providerId());
+ FlowRuleEvent event = null;
+ switch (stored.state()) {
+ case ADDED:
+ case PENDING_ADD:
+ frp.applyFlowRule(stored);
+ break;
+ case PENDING_REMOVE:
+ case REMOVED:
+ event = store.removeFlowRule(stored);
+ break;
+ default:
+ break;
+
+ }
+ if (event != null) {
+ log.debug("Flow {} removed", flowEntry);
+ post(event);
+ }
+ }
+
+
+ private void flowMissing(FlowEntry flowRule) {
+ checkNotNull(flowRule, FLOW_RULE_NULL);
+ checkValidity();
+ Device device = deviceService.getDevice(flowRule.deviceId());
+ FlowRuleProvider frp = getProvider(device.providerId());
+ FlowRuleEvent event = null;
+ switch (flowRule.state()) {
+ case PENDING_REMOVE:
+ case REMOVED:
+ event = store.removeFlowRule(flowRule);
+ frp.removeFlowRule(flowRule);
+ break;
+ case ADDED:
+ case PENDING_ADD:
+ try {
+ frp.applyFlowRule(flowRule);
+ } catch (UnsupportedOperationException e) {
+ log.warn(e.getMessage());
+ if (flowRule instanceof DefaultFlowEntry) {
+ ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED);
+ }
+ }
+ break;
+ default:
+ log.debug("Flow {} has not been installed.", flowRule);
+ }
+
+ if (event != null) {
+ log.debug("Flow {} removed", flowRule);
+ post(event);
+ }
+
+ }
+
+
+ private void extraneousFlow(FlowRule flowRule) {
+ checkNotNull(flowRule, FLOW_RULE_NULL);
+ checkValidity();
+ FlowRuleProvider frp = getProvider(flowRule.deviceId());
+ frp.removeFlowRule(flowRule);
+ log.debug("Flow {} is on switch but not in store.", flowRule);
+ }
+
+
+ private void flowAdded(FlowEntry flowEntry) {
+ checkNotNull(flowEntry, FLOW_RULE_NULL);
+ checkValidity();
+
+ if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) {
+
+ FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
+ if (event == null) {
+ log.debug("No flow store event generated.");
+ } else {
+ log.trace("Flow {} {}", flowEntry, event.type());
+ post(event);
+ }
+ } else {
+ log.debug("Removing flow rules....");
+ removeFlowRules(flowEntry);
+ }
+
+ }
+
+ private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
+ if (storedRule == null) {
+ return false;
+ }
+ if (storedRule.isPermanent()) {
+ return true;
+ }
+
+ final long timeout = storedRule.timeout() * 1000;
+ final long currentTime = System.currentTimeMillis();
+ if (storedRule.packets() != swRule.packets()) {
+ lastSeen.put(storedRule, currentTime);
+ return true;
+ }
+ if (!lastSeen.containsKey(storedRule)) {
+ // checking for the first time
+ lastSeen.put(storedRule, storedRule.lastSeen());
+ // Use following if lastSeen attr. was removed.
+ //lastSeen.put(storedRule, currentTime);
+ }
+ Long last = lastSeen.get(storedRule);
+ if (last == null) {
+ // concurrently removed? let the liveness check fail
+ return false;
+ }
+
+ if ((currentTime - last) <= timeout) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
+ Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
+ store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
+
+ for (FlowEntry rule : flowEntries) {
+ try {
+ FlowEntry storedRule = storedRules.remove(rule);
+ if (storedRule != null) {
+ if (storedRule.exactMatch(rule)) {
+ // we both have the rule, let's update some info then.
+ flowAdded(rule);
+ } else {
+ // the two rules are not an exact match - remove the
+ // switch's rule and install our rule
+ extraneousFlow(rule);
+ flowMissing(storedRule);
+ }
+ } else {
+ // the device has a rule the store does not have
+ if (!allowExtraneousRules) {
+ extraneousFlow(rule);
+ }
+ }
+ } catch (Exception e) {
+ log.debug("Can't process added or extra rule {}", e.getMessage());
+ continue;
+ }
+ }
+ for (FlowEntry rule : storedRules.keySet()) {
+ try {
+ // there are rules in the store that aren't on the switch
+ log.debug("Adding rule in store, but not on switch {}", rule);
+ flowMissing(rule);
+ } catch (Exception e) {
+ log.debug("Can't add missing flow rule {}", e.getMessage());
+ continue;
+ }
+ }
+
+ }
+
+ @Override
+ public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+ operation
+ ));
+ }
+ }
+
+ // Store delegate to re-post events emitted from the store.
+ private class InternalStoreDelegate implements FlowRuleStoreDelegate {
+
+
+ // TODO: Right now we only dispatch events at individual flowEntry level.
+ // It may be more efficient for also dispatch events as a batch.
+ @Override
+ public void notify(FlowRuleBatchEvent event) {
+ final FlowRuleBatchRequest request = event.subject();
+ switch (event.type()) {
+ case BATCH_OPERATION_REQUESTED:
+ // Request has been forwarded to MASTER Node, and was
+ request.ops().stream().forEach(
+ op -> {
+ switch (op.operator()) {
+
+ case ADD:
+ post(new FlowRuleEvent(RULE_ADD_REQUESTED,
+ op.target()));
+ break;
+ case REMOVE:
+ post(new FlowRuleEvent(RULE_REMOVE_REQUESTED,
+ op.target()));
+ break;
+ case MODIFY:
+ //TODO: do something here when the time comes.
+ break;
+ default:
+ log.warn("Unknown flow operation operator: {}", op.operator());
+ }
+ }
+ );
+
+ DeviceId deviceId = event.deviceId();
+
+ FlowRuleBatchOperation batchOperation =
+ request.asBatchOperation(deviceId);
+
+ FlowRuleProvider flowRuleProvider = getProvider(deviceId);
+ if (flowRuleProvider != null) {
+ flowRuleProvider.executeBatch(batchOperation);
+ }
+
+ break;
+
+ case BATCH_OPERATION_COMPLETED:
+
+ FlowOperationsProcessor fops = pendingFlowOperations.remove(
+ event.subject().batchId());
+ if (event.result().isSuccess()) {
+ if (fops != null) {
+ fops.satisfy(event.deviceId());
+ }
+ } else {
+ fops.fail(event.deviceId(), event.result().failedItems());
+ }
+
+ break;
+
+ default:
+ break;
+ }
+ }
+ }
+
+ private class FlowOperationsProcessor implements Runnable {
+
+ private final List<Set<FlowRuleOperation>> stages;
+ private final FlowRuleOperationsContext context;
+ private final FlowRuleOperations fops;
+ private final AtomicBoolean hasFailed = new AtomicBoolean(false);
+
+ private Set<DeviceId> pendingDevices;
+
+ public FlowOperationsProcessor(FlowRuleOperations ops) {
+ this.stages = Lists.newArrayList(ops.stages());
+ this.context = ops.callback();
+ this.fops = ops;
+ pendingDevices = Sets.newConcurrentHashSet();
+ }
+
+ @Override
+ public void run() {
+ if (stages.size() > 0) {
+ process(stages.remove(0));
+ } else if (!hasFailed.get() && context != null) {
+ context.onSuccess(fops);
+ }
+ }
+
+ private void process(Set<FlowRuleOperation> ops) {
+ Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+ ArrayListMultimap.create();
+
+ FlowRuleBatchEntry fbe;
+ for (FlowRuleOperation flowRuleOperation : ops) {
+ switch (flowRuleOperation.type()) {
+ // FIXME: Brian needs imagination when creating class names.
+ case ADD:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+ break;
+ case MODIFY:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+ break;
+ case REMOVE:
+ fbe = new FlowRuleBatchEntry(
+ FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+ }
+ pendingDevices.add(flowRuleOperation.rule().deviceId());
+ perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+ }
+
+
+ for (DeviceId deviceId : perDeviceBatches.keySet()) {
+ long id = idGenerator.getNewId();
+ final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+ deviceId, id);
+ pendingFlowOperations.put(id, this);
+ deviceInstallers.submit(() -> store.storeBatch(b));
+ }
+ }
+
+ public void satisfy(DeviceId devId) {
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
+ }
+ }
+
+
+
+ public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+ hasFailed.set(true);
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ operationsService.submit(this);
+ }
+
+ if (context != null) {
+ final FlowRuleOperations.Builder failedOpsBuilder =
+ FlowRuleOperations.builder();
+ failures.stream().forEach(failedOpsBuilder::add);
+
+ context.onError(failedOpsBuilder.build());
+ }
+ }
+
+ }
+}