aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java')
-rw-r--r--framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java341
1 files changed, 292 insertions, 49 deletions
diff --git a/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java b/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
index e63a404c..cf3c7e89 100644
--- a/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
+++ b/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.onlab.osgi.ServiceDirectory;
@@ -147,6 +149,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
private static final int L3UNICASTMASK = 0x20000000;
//private static final int MPLSINTERFACEMASK = 0x90000000;
private static final int L3ECMPMASK = 0x70000000;
+ private static final int L2FLOODMASK = 0x40000000;
private final Logger log = getLogger(getClass());
private ServiceDirectory serviceDirectory;
@@ -176,6 +179,13 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
private Set<IPCriterion> sentIpFilters = Collections.newSetFromMap(
new ConcurrentHashMap<IPCriterion, Boolean>());
+ // local stores for port-vlan mapping
+ Map<PortNumber, VlanId> port2Vlan = new ConcurrentHashMap<PortNumber, VlanId>();
+ Map<VlanId, Set<PortNumber>> vlan2Port = new ConcurrentHashMap<VlanId,
+ Set<PortNumber>>();
+
+
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.serviceDirectory = context.directory();
@@ -275,26 +285,23 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
@Override
public void next(NextObjective nextObjective) {
- switch (nextObjective.type()) {
- case SIMPLE:
- Collection<TrafficTreatment> treatments = nextObjective.next();
- if (treatments.size() != 1) {
- log.error("Next Objectives of type Simple should only have a "
- + "single Traffic Treatment. Next Objective Id:{}", nextObjective.id());
- fail(nextObjective, ObjectiveError.BADPARAMS);
- return;
+ log.debug("Processing NextObjective id{} op{}", nextObjective.id(),
+ nextObjective.op());
+ if (nextObjective.op() == Objective.Operation.REMOVE) {
+ if (nextObjective.next().isEmpty()) {
+ removeGroup(nextObjective);
+ } else {
+ removeBucketFromGroup(nextObjective);
}
- processSimpleNextObjective(nextObjective);
- break;
- case HASHED:
- case BROADCAST:
- case FAILOVER:
- fail(nextObjective, ObjectiveError.UNSUPPORTED);
- log.warn("Unsupported next objective type {}", nextObjective.type());
- break;
- default:
- fail(nextObjective, ObjectiveError.UNKNOWN);
- log.warn("Unknown next objective type {}", nextObjective.type());
+ } else if (nextObjective.op() == Objective.Operation.ADD) {
+ NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id());
+ if (nextGroup != null) {
+ addBucketToGroup(nextObjective);
+ } else {
+ addGroup(nextObjective);
+ }
+ } else {
+ log.warn("Unsupported operation {}", nextObjective.op());
}
}
@@ -302,6 +309,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
// Flow handling
//////////////////////////////////////
+
/**
* As per OFDPA 2.0 TTP, filtering of VLAN ids, MAC addresses (for routing)
* and IP addresses configured on switch ports happen in different tables.
@@ -455,14 +463,19 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
selector.matchVlanId(vidCriterion.vlanId());
+ treatment.transition(TMAC_TABLE);
+
+ VlanId storeVlan = null;
if (vidCriterion.vlanId() == VlanId.NONE) {
// untagged packets are assigned vlans
treatment.pushVlan().setVlanId(assignedVlan);
// XXX ofdpa will require an additional vlan match on the assigned vlan
// and it may not require the push. This is not in compliance with OF
// standard. Waiting on what the exact flows are going to look like.
+ storeVlan = assignedVlan;
+ } else {
+ storeVlan = vidCriterion.vlanId();
}
- treatment.transition(TMAC_TABLE);
// ofdpa cannot match on ALL portnumber, so we need to use separate
// rules for each port.
@@ -476,7 +489,20 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
} else {
portnums.add(portCriterion.port());
}
+
for (PortNumber pnum : portnums) {
+ // update storage
+ port2Vlan.put(pnum, storeVlan);
+ Set<PortNumber> vlanPorts = vlan2Port.get(storeVlan);
+ if (vlanPorts == null) {
+ vlanPorts = Collections.newSetFromMap(
+ new ConcurrentHashMap<PortNumber, Boolean>());
+ vlanPorts.add(pnum);
+ vlan2Port.put(storeVlan, vlanPorts);
+ } else {
+ vlanPorts.add(pnum);
+ }
+ // create rest of flowrule
selector.matchInPort(pnum);
FlowRule rule = DefaultFlowRule.builder()
.forDevice(deviceId)
@@ -708,10 +734,39 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
// Group handling
//////////////////////////////////////
+ private void addGroup(NextObjective nextObjective) {
+ switch (nextObjective.type()) {
+ case SIMPLE:
+ Collection<TrafficTreatment> treatments = nextObjective.next();
+ if (treatments.size() != 1) {
+ log.error("Next Objectives of type Simple should only have a "
+ + "single Traffic Treatment. Next Objective Id:{}",
+ nextObjective.id());
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ return;
+ }
+ processSimpleNextObjective(nextObjective);
+ break;
+ case BROADCAST:
+ processBroadcastNextObjective(nextObjective);
+ break;
+ case HASHED:
+ processHashedNextObjective(nextObjective);
+ break;
+ case FAILOVER:
+ fail(nextObjective, ObjectiveError.UNSUPPORTED);
+ log.warn("Unsupported next objective type {}", nextObjective.type());
+ break;
+ default:
+ fail(nextObjective, ObjectiveError.UNKNOWN);
+ log.warn("Unknown next objective type {}", nextObjective.type());
+ }
+ }
+
/**
* As per the OFDPA 2.0 TTP, packets are sent out of ports by using
* a chain of groups, namely an L3 Unicast Group that points to an L2 Interface
- * Group which in turns points to an output port. The Next Objective passed
+ * Group which in-turn points to an output port. The Next Objective passed
* in by the application has to be broken up into a group chain
* to satisfy this TTP.
*
@@ -770,7 +825,9 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
Integer l3groupId = L3UNICASTMASK | (int) portNum;
l3utt.group(new DefaultGroupId(l2groupId));
GroupChainElem gce = new GroupChainElem(l3groupkey, l3groupId,
- l3utt.build(), nextObj.appId());
+ GroupDescription.Type.INDIRECT,
+ Collections.singletonList(l3utt.build()),
+ nextObj.appId(), 1);
// create object for local and distributed storage
List<GroupKey> gkeys = new ArrayList<GroupKey>();
@@ -797,27 +854,201 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
/**
+ * As per the OFDPA 2.0 TTP, packets are sent out of ports by using
+ * a chain of groups. The Next Objective passed in by the application
+ * has to be broken up into a group chain comprising of an
+ * L2 Flood group whose buckets point to L2 Interface groups.
+ *
+ * @param nextObj the nextObjective of type BROADCAST
+ */
+ private void processBroadcastNextObjective(NextObjective nextObj) {
+ // break up broadcast next objective to multiple groups
+ Collection<TrafficTreatment> buckets = nextObj.next();
+
+ // each treatment is converted to an L2 interface group
+ int indicator = 0;
+ VlanId vlanid = null;
+ List<GroupInfo> groupInfoCollection = new ArrayList<>();
+ for (TrafficTreatment treatment : buckets) {
+ TrafficTreatment.Builder newTreatment = DefaultTrafficTreatment.builder();
+ PortNumber portNum = null;
+ // ensure that the only allowed treatments are pop-vlan and output
+ for (Instruction ins : treatment.allInstructions()) {
+ if (ins.type() == Instruction.Type.L2MODIFICATION) {
+ L2ModificationInstruction l2ins = (L2ModificationInstruction) ins;
+ switch (l2ins.subtype()) {
+ case VLAN_POP:
+ newTreatment.add(l2ins);
+ break;
+ default:
+ log.debug("action {} not permitted for broadcast nextObj",
+ l2ins.subtype());
+ break;
+ }
+ } else if (ins.type() == Instruction.Type.OUTPUT) {
+ portNum = ((OutputInstruction) ins).port();
+ newTreatment.add(ins);
+ } else {
+ log.debug("TrafficTreatment of type {} not permitted in "
+ + " broadcast nextObjective", ins.type());
+ }
+ }
+
+ // also ensure that all ports are in the same vlan
+ VlanId thisvlanid = port2Vlan.get(portNum);
+ if (vlanid == null) {
+ vlanid = thisvlanid;
+ } else {
+ if (!vlanid.equals(thisvlanid)) {
+ log.error("Driver requires all ports in a broadcast nextObj "
+ + "to be in the same vlan. Different vlans found "
+ + "{} and {}. Aborting group creation", vlanid, thisvlanid);
+ return;
+ }
+ }
+
+ // assemble info for all l2 interface groups
+ indicator += GROUP1MASK;
+ int l2gk = nextObj.id() | indicator;
+ final GroupKey l2groupkey = new DefaultGroupKey(appKryo.serialize(l2gk));
+ Integer l2groupId = L2INTERFACEMASK | (vlanid.toShort() << 16) |
+ (int) portNum.toLong();
+ GroupBucket newbucket =
+ DefaultGroupBucket.createIndirectGroupBucket(newTreatment.build());
+
+ // store the info needed to create this group
+ groupInfoCollection.add(new GroupInfo(l2groupId, l2groupkey, newbucket));
+ }
+
+ // assemble info for l2 flood group
+ int l2floodgk = nextObj.id() | GROUP0MASK;
+ final GroupKey l2floodgroupkey = new DefaultGroupKey(appKryo.serialize(l2floodgk));
+ Integer l2floodgroupId = L2FLOODMASK | (vlanid.toShort() << 16) | nextObj.id();
+ // collection of treatment with groupids of l2 interface groups
+ List<TrafficTreatment> floodtt = new ArrayList<>();
+ for (GroupInfo gi : groupInfoCollection) {
+ TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
+ ttb.group(new DefaultGroupId(gi.groupId));
+ floodtt.add(ttb.build());
+ }
+ GroupChainElem gce = new GroupChainElem(l2floodgroupkey, l2floodgroupId,
+ GroupDescription.Type.ALL,
+ floodtt,
+ nextObj.appId(),
+ groupInfoCollection.size());
+
+ // create objects for local and distributed storage
+ List<GroupKey> gkeys = new ArrayList<GroupKey>();
+ gkeys.add(l2floodgroupkey); // group0 in chain
+ OfdpaGroupChain ofdpaGrp = new OfdpaGroupChain(gkeys, nextObj);
+
+ // store l2floodgroupkey with the ofdpaGroupChain for the nextObjective
+ // that depends on it
+ pendingNextObjectives.put(l2floodgroupkey, ofdpaGrp);
+
+ for (GroupInfo gi : groupInfoCollection) {
+ // store all l2groupkeys with the groupChainElem for the l2floodgroup
+ // that depends on it
+ pendingGroups.put(gi.groupKey, gce);
+
+ // create and send groups for all l2 interface groups
+ GroupDescription groupDescription =
+ new DefaultGroupDescription(
+ deviceId,
+ GroupDescription.Type.INDIRECT,
+ new GroupBuckets(Collections.singletonList(gi.groupBucket)),
+ gi.groupKey,
+ gi.groupId,
+ nextObj.appId());
+ groupService.addGroup(groupDescription);
+ }
+ }
+
+ private class GroupInfo {
+ private Integer groupId;
+ private GroupKey groupKey;
+ private GroupBucket groupBucket;
+
+ GroupInfo(Integer groupId, GroupKey groupKey, GroupBucket groupBucket) {
+ this.groupBucket = groupBucket;
+ this.groupId = groupId;
+ this.groupKey = groupKey;
+ }
+ }
+
+ private void processHashedNextObjective(NextObjective nextObj) {
+ // TODO Auto-generated method stub
+ }
+
+ private void addBucketToGroup(NextObjective nextObjective) {
+ // TODO Auto-generated method stub
+ }
+
+ private void removeBucketFromGroup(NextObjective nextObjective) {
+ // TODO Auto-generated method stub
+ }
+
+ private void removeGroup(NextObjective nextObjective) {
+ // TODO Auto-generated method stub
+ }
+
+ /**
* Processes next element of a group chain. Assumption is that if this
* group points to another group, the latter has already been created
* and this driver has received notification for it. A second assumption is
* that if there is another group waiting for this group then the appropriate
* stores already have the information to act upon the notification for the
* creating of this group.
+ * <p>
+ * The processing of the GroupChainElement depends on the number of groups
+ * this element is waiting on. For all group types other than SIMPLE, a
+ * GroupChainElement could be waiting on multiple groups.
*
* @param gce the group chain element to be processed next
*/
private void processGroupChain(GroupChainElem gce) {
- GroupBucket bucket = DefaultGroupBucket
- .createIndirectGroupBucket(gce.getBucketActions());
- GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
- GroupDescription.Type.INDIRECT,
- new GroupBuckets(Collections.singletonList(bucket)),
- gce.getGkey(),
- gce.getGivenGroupId(),
- gce.getAppId());
- groupService.addGroup(groupDesc);
- }
+ int waitOnGroups = gce.decrementAndGetGroupsWaitedOn();
+ if (waitOnGroups != 0) {
+ log.debug("GCE: {} waiting on {} groups. Not processing yet",
+ gce, waitOnGroups);
+ return;
+ }
+ List<GroupBucket> buckets = new ArrayList<>();
+ switch (gce.groupType) {
+ case INDIRECT:
+ GroupBucket ibucket = DefaultGroupBucket
+ .createIndirectGroupBucket(gce.bucketActions.iterator().next());
+ buckets.add(ibucket);
+ break;
+ case ALL:
+ for (TrafficTreatment tt : gce.bucketActions) {
+ GroupBucket abucket = DefaultGroupBucket
+ .createAllGroupBucket(tt);
+ buckets.add(abucket);
+ }
+ break;
+ case SELECT:
+ for (TrafficTreatment tt : gce.bucketActions) {
+ GroupBucket sbucket = DefaultGroupBucket
+ .createSelectGroupBucket(tt);
+ buckets.add(sbucket);
+ }
+ break;
+ case FAILOVER:
+ default:
+ log.error("Unknown or unimplemented GroupChainElem {}", gce);
+ }
+ if (buckets.size() > 0) {
+ GroupDescription groupDesc = new DefaultGroupDescription(
+ deviceId, gce.groupType,
+ new GroupBuckets(buckets),
+ gce.gkey,
+ gce.givenGroupId,
+ gce.appId);
+ groupService.addGroup(groupDesc);
+ }
+ }
private class GroupChecker implements Runnable {
@Override
@@ -837,7 +1068,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
log.info("Group service processed group key {}. Processing next "
+ "group in group chain with group key {}",
appKryo.deserialize(key.key()),
- appKryo.deserialize(gce.getGkey().key()));
+ appKryo.deserialize(gce.gkey.key()));
processGroupChain(gce);
} else {
OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key);
@@ -866,7 +1097,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
log.info("group ADDED with group key {} .. "
+ "Processing next group in group chain with group key {}",
appKryo.deserialize(key.key()),
- appKryo.deserialize(gce.getGkey().key()));
+ appKryo.deserialize(gce.gkey.key()));
processGroupChain(gce);
} else {
OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key);
@@ -890,6 +1121,11 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* look like group0 --> group 1 --> outPort. Information about the groups
* themselves can be fetched from the Group Service using the group keys from
* objects instantiating this class.
+ *
+ * XXX Revisit this - since the forwarding objective only ever needs the
+ * groupkey of the top-level group in the group chain, why store a series
+ * of groupkeys. Also the group-chain list only works for 1-to-1 chaining,
+ * not for 1-to-many chaining.
*/
private class OfdpaGroupChain implements NextGroup {
private final NextObjective nextObj;
@@ -925,33 +1161,40 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* preceding groups in the group chain to be created.
*/
private class GroupChainElem {
- private TrafficTreatment bucketActions;
+ private Collection<TrafficTreatment> bucketActions;
private Integer givenGroupId;
+ private GroupDescription.Type groupType;
private GroupKey gkey;
private ApplicationId appId;
+ private AtomicInteger waitOnGroups;
- public GroupChainElem(GroupKey gkey, Integer givenGroupId,
- TrafficTreatment tr, ApplicationId appId) {
+ GroupChainElem(GroupKey gkey, Integer givenGroupId,
+ GroupDescription.Type groupType,
+ Collection<TrafficTreatment> tr, ApplicationId appId,
+ int waitOnGroups) {
this.bucketActions = tr;
this.givenGroupId = givenGroupId;
+ this.groupType = groupType;
this.gkey = gkey;
this.appId = appId;
+ this.waitOnGroups = new AtomicInteger(waitOnGroups);
}
- public TrafficTreatment getBucketActions() {
- return bucketActions;
- }
-
- public Integer getGivenGroupId() {
- return givenGroupId;
+ /**
+ * This methods atomically decrements the counter for the number of
+ * groups this GroupChainElement is waiting on, for notifications from
+ * the Group Service. When this method returns a value of 0, this
+ * GroupChainElement is ready to be processed.
+ *
+ * @return integer indication of the number of notifications being waited on
+ */
+ int decrementAndGetGroupsWaitedOn() {
+ return waitOnGroups.decrementAndGet();
}
- public GroupKey getGkey() {
- return gkey;
- }
-
- public ApplicationId getAppId() {
- return appId;
+ @Override
+ public String toString() {
+ return Integer.toHexString(givenGroupId);
}
}