From e63291850fd0795c5700e25e67e5dee89ba54c5f Mon Sep 17 00:00:00 2001 From: Ashlee Young Date: Tue, 1 Dec 2015 05:49:27 -0800 Subject: onos commit hash c2999f30c69e50df905a9d175ef80b3f23a98514 Change-Id: I2bb8562c4942b6d6a6d60b663db2e17540477b81 Signed-off-by: Ashlee Young --- .../driver/pipeline/OFDPA2Pipeline.java | 818 +++++++++++++++------ 1 file changed, 586 insertions(+), 232 deletions(-) (limited to 'framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java') diff --git a/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java b/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java index cf3c7e89..863caebb 100644 --- a/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java +++ b/framework/src/onos/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java @@ -19,9 +19,11 @@ import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Set; @@ -65,15 +67,20 @@ import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.flow.TrafficTreatment; import org.onosproject.net.flow.criteria.Criteria; import org.onosproject.net.flow.criteria.Criterion; +import org.onosproject.net.flow.criteria.Criterion.Type; import org.onosproject.net.flow.criteria.EthCriterion; import org.onosproject.net.flow.criteria.EthTypeCriterion; import org.onosproject.net.flow.criteria.IPCriterion; +import org.onosproject.net.flow.criteria.MplsBosCriterion; +import org.onosproject.net.flow.criteria.MplsCriterion; import org.onosproject.net.flow.criteria.PortCriterion; import org.onosproject.net.flow.criteria.VlanIdCriterion; import org.onosproject.net.flow.instructions.Instruction; import org.onosproject.net.flow.instructions.Instructions.OutputInstruction; import org.onosproject.net.flow.instructions.L2ModificationInstruction; +import org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType; import org.onosproject.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction; +import org.onosproject.net.flow.instructions.L2ModificationInstruction.ModMplsLabelInstruction; import org.onosproject.net.flow.instructions.L2ModificationInstruction.ModVlanIdInstruction; import org.onosproject.net.flowobjective.FilteringObjective; import org.onosproject.net.flowobjective.FlowObjectiveStore; @@ -127,51 +134,44 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline protected static final int DEFAULT_PRIORITY = 0x8000; protected static final int LOWEST_PRIORITY = 0x0; - /* - * Group keys are normally generated by using the next Objective id. In the - * case of a next objective resulting in a group chain, each group derives a - * group key from the next objective id in the following way: - * The upper 4 bits of the group-key are used to denote the position of the - * group in the group chain. For example, in the chain - * group0 --> group1 --> group2 --> port - * group0's group key would have the upper 4 bits as 0, group1's upper four - * bits would be 1, and so on - */ - private static final int GROUP0MASK = 0x0; - private static final int GROUP1MASK = 0x10000000; - /* * OFDPA requires group-id's to have a certain form. * L2 Interface Groups have <4bits-0><12bits-vlanid><16bits-portid> * L3 Unicast Groups have <4bits-2><28bits-index> + * MPLS Interface Groups have <4bits-9><4bits:0><24bits-index> + * L3 ECMP Groups have <4bits-7><28bits-index> + * L2 Flood Groups have <4bits-4><12bits-vlanid><16bits-index> + * L3 VPN Groups have <4bits-9><4bits-2><24bits-index> */ private static final int L2INTERFACEMASK = 0x0; private static final int L3UNICASTMASK = 0x20000000; - //private static final int MPLSINTERFACEMASK = 0x90000000; + private static final int MPLSINTERFACEMASK = 0x90000000; private static final int L3ECMPMASK = 0x70000000; private static final int L2FLOODMASK = 0x40000000; + private static final int L3VPNMASK = 0x92000000; private final Logger log = getLogger(getClass()); private ServiceDirectory serviceDirectory; protected FlowRuleService flowRuleService; private CoreService coreService; - private GroupService groupService; - private FlowObjectiveStore flowObjectiveStore; + protected GroupService groupService; + protected FlowObjectiveStore flowObjectiveStore; protected DeviceId deviceId; protected ApplicationId driverId; protected PacketService packetService; protected DeviceService deviceService; private InternalPacketProcessor processor = new InternalPacketProcessor(); - private KryoNamespace appKryo = new KryoNamespace.Builder() + protected KryoNamespace appKryo = new KryoNamespace.Builder() .register(KryoNamespaces.API) .register(GroupKey.class) .register(DefaultGroupKey.class) - .register(OfdpaGroupChain.class) + .register(OfdpaNextGroup.class) .register(byte[].class) + .register(ArrayDeque.class) .build(); - private Cache pendingNextObjectives; - private ConcurrentHashMap pendingGroups; + private Cache pendingNextObjectives; + private ConcurrentHashMap> pendingGroups; private ScheduledExecutorService groupChecker = Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", @@ -184,6 +184,8 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline Map> vlan2Port = new ConcurrentHashMap>(); + // index number for group creation + AtomicInteger l3vpnindex = new AtomicInteger(0); @Override @@ -193,15 +195,16 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline pendingNextObjectives = CacheBuilder.newBuilder() .expireAfterWrite(20, TimeUnit.SECONDS) - .removalListener((RemovalNotification notification) -> { - if (notification.getCause() == RemovalCause.EXPIRED) { - fail(notification.getValue().nextObjective(), - ObjectiveError.GROUPINSTALLATIONFAILED); - } + .removalListener(( + RemovalNotification notification) -> { + if (notification.getCause() == RemovalCause.EXPIRED) { + fail(notification.getValue().nextObjective(), + ObjectiveError.GROUPINSTALLATIONFAILED); + } }).build(); groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS); - pendingGroups = new ConcurrentHashMap(); + pendingGroups = new ConcurrentHashMap>(); coreService = serviceDirectory.get(CoreService.class); flowRuleService = serviceDirectory.get(FlowRuleService.class); @@ -285,22 +288,49 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline @Override public void next(NextObjective nextObjective) { - log.debug("Processing NextObjective id{} op{}", nextObjective.id(), - nextObjective.op()); - if (nextObjective.op() == Objective.Operation.REMOVE) { - if (nextObjective.next().isEmpty()) { - removeGroup(nextObjective); - } else { - removeBucketFromGroup(nextObjective); + NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id()); + switch (nextObjective.op()) { + case ADD: + if (nextGroup != null) { + log.warn("Cannot add next {} that already exists in device {}", + nextObjective.id(), deviceId); + return; } - } else if (nextObjective.op() == Objective.Operation.ADD) { - NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id()); + log.debug("Processing NextObjective id{} in dev{} - add group", + nextObjective.id(), deviceId); + addGroup(nextObjective); + break; + case ADD_TO_EXISTING: if (nextGroup != null) { + log.debug("Processing NextObjective id{} in dev{} - add bucket", + nextObjective.id(), deviceId); addBucketToGroup(nextObjective); } else { - addGroup(nextObjective); + // it is possible that group-chain has not been fully created yet + waitToAddBucketToGroup(nextObjective); } - } else { + break; + case REMOVE: + if (nextGroup == null) { + log.warn("Cannot remove next {} that does not exist in device {}", + nextObjective.id(), deviceId); + return; + } + log.debug("Processing NextObjective id{} in dev{} - remove group", + nextObjective.id(), deviceId); + removeGroup(nextObjective); + break; + case REMOVE_FROM_EXISTING: + if (nextGroup == null) { + log.warn("Cannot remove from next {} that does not exist in device {}", + nextObjective.id(), deviceId); + return; + } + log.debug("Processing NextObjective id{} in dev{} - remove bucket", + nextObjective.id(), deviceId); + removeBucketFromGroup(nextObjective); + break; + default: log.warn("Unsupported operation {}", nextObjective.op()); } } @@ -309,7 +339,6 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline // Flow handling ////////////////////////////////////// - /** * As per OFDPA 2.0 TTP, filtering of VLAN ids, MAC addresses (for routing) * and IP addresses configured on switch ports happen in different tables. @@ -520,7 +549,6 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline /** * Allows routed packets with correct destination MAC to be directed * to unicast-IP routing table or MPLS forwarding table. - * XXX need to add rule for multicast routing. * * @param portCriterion port on device for which this filter is programmed * @param ethCriterion dstMac of device for which is filter is programmed @@ -661,38 +689,78 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline /** * In the OF-DPA 2.0 pipeline, specific forwarding refers to the IP table - * (unicast or multicast) or the L2 table (mac + vlan). + * (unicast or multicast) or the L2 table (mac + vlan) or the MPLS table. * * @param fwd the forwarding objective of type 'specific' * @return a collection of flow rules. Typically there will be only one * for this type of forwarding objective. An empty set may be * returned if there is an issue in processing the objective. */ - private Collection processSpecific(ForwardingObjective fwd) { - log.debug("Processing specific forwarding objective"); + protected Collection processSpecific(ForwardingObjective fwd) { TrafficSelector selector = fwd.selector(); EthTypeCriterion ethType = (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE); - // XXX currently supporting only the L3 unicast table - if (ethType == null || ethType.ethType().toShort() != Ethernet.TYPE_IPV4) { + if ((ethType == null) || + (ethType.ethType().toShort() != Ethernet.TYPE_IPV4) && + (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)) { + log.warn("processSpecific: Unsupported " + + "forwarding objective criteraia"); fail(fwd, ObjectiveError.UNSUPPORTED); return Collections.emptySet(); } - TrafficSelector filteredSelector = - DefaultTrafficSelector.builder() - .matchEthType(Ethernet.TYPE_IPV4) - .matchIPDst( - ((IPCriterion) - selector.getCriterion(Criterion.Type.IPV4_DST)).ip()) - .build(); + int forTableId = -1; + TrafficSelector.Builder filteredSelector = DefaultTrafficSelector.builder(); + if (ethType.ethType().toShort() == Ethernet.TYPE_IPV4) { + filteredSelector.matchEthType(Ethernet.TYPE_IPV4) + .matchIPDst(((IPCriterion) + selector.getCriterion(Criterion.Type.IPV4_DST)).ip()); + forTableId = UNICAST_ROUTING_TABLE; + log.debug("processing IPv4 specific forwarding objective {} in dev:{}", + fwd.id(), deviceId); + } else { + filteredSelector + .matchEthType(Ethernet.MPLS_UNICAST) + .matchMplsLabel(((MplsCriterion) + selector.getCriterion(Criterion.Type.MPLS_LABEL)).label()); + MplsBosCriterion bos = (MplsBosCriterion) selector + .getCriterion(Criterion.Type.MPLS_BOS); + if (bos != null) { + filteredSelector.matchMplsBos(bos.mplsBos()); + } + forTableId = MPLS_TABLE_1; + log.debug("processing MPLS specific forwarding objective {} in dev {}", + fwd.id(), deviceId); + } TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder(); + boolean popMpls = false; + if (fwd.treatment() != null) { + for (Instruction i : fwd.treatment().allInstructions()) { + tb.add(i); + if (i instanceof L2ModificationInstruction && + ((L2ModificationInstruction) i).subtype() == L2SubType.MPLS_POP) { + popMpls = true; + } + } + } if (fwd.nextId() != null) { + if (forTableId == MPLS_TABLE_1 && !popMpls) { + log.warn("SR CONTINUE case cannot be handled as MPLS ECMP " + + "is not implemented in OF-DPA yet. Aborting this flow " + + "in this device {}", deviceId); + // XXX We could convert to forwarding to a single-port, via a + // MPLS interface, or a MPLS SWAP (with-same) but that would + // have to be handled in the next-objective. Also the pop-mpls + // logic used here won't work in non-BoS case. + return Collections.emptySet(); + } + NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId()); - List gkeys = appKryo.deserialize(next.data()); - Group group = groupService.getGroup(deviceId, gkeys.get(0)); + List> gkeys = appKryo.deserialize(next.data()); + // we only need the top level group's key to point the flow to it + Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst()); if (group == null) { log.warn("The group left!"); fail(fwd, ObjectiveError.GROUPMISSING); @@ -705,8 +773,9 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline .fromApp(fwd.appId()) .withPriority(fwd.priority()) .forDevice(deviceId) - .withSelector(filteredSelector) - .withTreatment(tb.build()); + .withSelector(filteredSelector.build()) + .withTreatment(tb.build()) + .forTable(forTableId); if (fwd.permanent()) { ruleBuilder.makePermanent(); @@ -714,7 +783,6 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline ruleBuilder.makeTemporary(fwd.timeout()); } - ruleBuilder.forTable(UNICAST_ROUTING_TABLE); return Collections.singletonList(ruleBuilder.build()); } @@ -724,7 +792,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline } } - private void fail(Objective obj, ObjectiveError error) { + protected void fail(Objective obj, ObjectiveError error) { if (obj.context().isPresent()) { obj.context().get().onError(obj, error); } @@ -765,20 +833,66 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline /** * As per the OFDPA 2.0 TTP, packets are sent out of ports by using - * a chain of groups, namely an L3 Unicast Group that points to an L2 Interface - * Group which in-turn points to an output port. The Next Objective passed + * a chain of groups. The simple Next Objective passed * in by the application has to be broken up into a group chain - * to satisfy this TTP. + * comprising of an L3 Unicast Group that points to an L2 Interface + * Group which in-turn points to an output port. In some cases, the simple + * next Objective can just be an L2 interface without the need for chaining. * * @param nextObj the nextObjective of type SIMPLE */ private void processSimpleNextObjective(NextObjective nextObj) { // break up simple next objective to GroupChain objects TrafficTreatment treatment = nextObj.next().iterator().next(); + + GroupInfo groupInfo = createL2L3Chain(treatment, nextObj.id(), + nextObj.appId(), false, + nextObj.meta()); + if (groupInfo == null) { + log.error("Could not process nextObj={} in dev:{}", nextObj.id(), deviceId); + return; + } + // create object for local and distributed storage + Deque gkeyChain = new ArrayDeque<>(); + gkeyChain.addFirst(groupInfo.innerGrpDesc.appCookie()); + gkeyChain.addFirst(groupInfo.outerGrpDesc.appCookie()); + OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup( + Collections.singletonList(gkeyChain), + nextObj); + + // store l3groupkey with the ofdpaGroupChain for the nextObjective that depends on it + pendingNextObjectives.put(groupInfo.outerGrpDesc.appCookie(), ofdpaGrp); + + // now we are ready to send the l2 groupDescription (inner), as all the stores + // that will get async replies have been updated. By waiting to update + // the stores, we prevent nasty race conditions. + groupService.addGroup(groupInfo.innerGrpDesc); + } + + /** + * Creates one of two possible group-chains from the treatment + * passed in. Depending on the MPLS boolean, this method either creates + * an L3Unicast Group --> L2Interface Group, if mpls is false; + * or MPLSInterface Group --> L2Interface Group, if mpls is true; + * The returned 'inner' group description is always the L2 Interface group. + * + * @param treatment that needs to be broken up to create the group chain + * @param nextId of the next objective that needs this group chain + * @param appId of the application that sent this next objective + * @param mpls determines if L3Unicast or MPLSInterface group is created + * @param meta metadata passed in by the application as part of the nextObjective + * @return GroupInfo containing the GroupDescription of the + * L2Interface group(inner) and the GroupDescription of the (outer) + * L3Unicast/MPLSInterface group. May return null if there is an + * error in processing the chain + */ + private GroupInfo createL2L3Chain(TrafficTreatment treatment, int nextId, + ApplicationId appId, boolean mpls, + TrafficSelector meta) { // for the l2interface group, get vlan and port info - // for the l3unicast group, get the src/dst mac and vlan info - TrafficTreatment.Builder l3utt = DefaultTrafficTreatment.builder(); - TrafficTreatment.Builder l2itt = DefaultTrafficTreatment.builder(); + // for the outer group, get the src/dst mac, and vlan info + TrafficTreatment.Builder outerTtb = DefaultTrafficTreatment.builder(); + TrafficTreatment.Builder innerTtb = DefaultTrafficTreatment.builder(); VlanId vlanid = null; long portNum = 0; for (Instruction ins : treatment.allInstructions()) { @@ -786,76 +900,144 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline L2ModificationInstruction l2ins = (L2ModificationInstruction) ins; switch (l2ins.subtype()) { case ETH_DST: - l3utt.setEthDst(((ModEtherInstruction) l2ins).mac()); + outerTtb.setEthDst(((ModEtherInstruction) l2ins).mac()); break; case ETH_SRC: - l3utt.setEthSrc(((ModEtherInstruction) l2ins).mac()); + outerTtb.setEthSrc(((ModEtherInstruction) l2ins).mac()); break; case VLAN_ID: vlanid = ((ModVlanIdInstruction) l2ins).vlanId(); - l3utt.setVlanId(vlanid); + outerTtb.setVlanId(vlanid); + break; + case VLAN_POP: + innerTtb.popVlan(); break; case DEC_MPLS_TTL: case MPLS_LABEL: case MPLS_POP: case MPLS_PUSH: case VLAN_PCP: - case VLAN_POP: case VLAN_PUSH: default: break; } } else if (ins.type() == Instruction.Type.OUTPUT) { portNum = ((OutputInstruction) ins).port().toLong(); - l2itt.add(ins); + innerTtb.add(ins); } else { log.warn("Driver does not handle this type of TrafficTreatment" + " instruction in nextObjectives: {}", ins.type()); } } + if (vlanid == null) { + //use the vlanid associated with the port + vlanid = port2Vlan.get(PortNumber.portNumber(portNum)); + } + + if (vlanid == null) { + // use metadata + for (Criterion metaCriterion : meta.criteria()) { + if (metaCriterion.type() == Type.VLAN_VID) { + vlanid = ((VlanIdCriterion) metaCriterion).vlanId(); + } + } + } + + if (vlanid == null) { + log.error("Driver cannot process an L2/L3 group chain without " + + "egress vlan information for dev: {} port:{}", + deviceId, portNum); + return null; + } + // assemble information for ofdpa l2interface group - int l2gk = nextObj.id() | GROUP1MASK; - final GroupKey l2groupkey = new DefaultGroupKey(appKryo.serialize(l2gk)); Integer l2groupId = L2INTERFACEMASK | (vlanid.toShort() << 16) | (int) portNum; + // a globally unique groupkey that is different for ports in the same devices + // but different for the same portnumber on different devices. Also different + // for the various group-types created out of the same next objective. + int l2gk = 0x0ffffff & (deviceId.hashCode() << 8 | (int) portNum); + final GroupKey l2groupkey = new DefaultGroupKey(appKryo.serialize(l2gk)); - // assemble information for ofdpa l3unicast group - int l3gk = nextObj.id() | GROUP0MASK; - final GroupKey l3groupkey = new DefaultGroupKey(appKryo.serialize(l3gk)); - Integer l3groupId = L3UNICASTMASK | (int) portNum; - l3utt.group(new DefaultGroupId(l2groupId)); - GroupChainElem gce = new GroupChainElem(l3groupkey, l3groupId, - GroupDescription.Type.INDIRECT, - Collections.singletonList(l3utt.build()), - nextObj.appId(), 1); - - // create object for local and distributed storage - List gkeys = new ArrayList(); - gkeys.add(l3groupkey); // group0 in chain - gkeys.add(l2groupkey); // group1 in chain - OfdpaGroupChain ofdpaGrp = new OfdpaGroupChain(gkeys, nextObj); + // assemble information for outer group + GroupDescription outerGrpDesc = null; + if (mpls) { + // outer group is MPLSInteface + Integer mplsgroupId = MPLSINTERFACEMASK | (int) portNum; + // using mplsinterfacemask in groupkey to differentiate from l2interface + int mplsgk = MPLSINTERFACEMASK | (0x0ffffff & (deviceId.hashCode() << 8 | (int) portNum)); + final GroupKey mplsgroupkey = new DefaultGroupKey(appKryo.serialize(mplsgk)); + outerTtb.group(new DefaultGroupId(l2groupId)); + // create the mpls-interface group description to wait for the + // l2 interface group to be processed + GroupBucket mplsinterfaceGroupBucket = + DefaultGroupBucket.createIndirectGroupBucket(outerTtb.build()); + outerGrpDesc = new DefaultGroupDescription( + deviceId, + GroupDescription.Type.INDIRECT, + new GroupBuckets(Collections.singletonList( + mplsinterfaceGroupBucket)), + mplsgroupkey, + mplsgroupId, + appId); + log.debug("Trying MPLS-Interface: device:{} gid:{} gkey:{} nextid:{}", + deviceId, Integer.toHexString(mplsgroupId), + mplsgroupkey, nextId); + } else { + // outer group is L3Unicast + Integer l3groupId = L3UNICASTMASK | (int) portNum; + int l3gk = L3UNICASTMASK | (0x0ffffff & (deviceId.hashCode() << 8 | (int) portNum)); + final GroupKey l3groupkey = new DefaultGroupKey(appKryo.serialize(l3gk)); + outerTtb.group(new DefaultGroupId(l2groupId)); + // create the l3unicast group description to wait for the + // l2 interface group to be processed + GroupBucket l3unicastGroupBucket = + DefaultGroupBucket.createIndirectGroupBucket(outerTtb.build()); + outerGrpDesc = new DefaultGroupDescription( + deviceId, + GroupDescription.Type.INDIRECT, + new GroupBuckets(Collections.singletonList( + l3unicastGroupBucket)), + l3groupkey, + l3groupId, + appId); + log.debug("Trying L3Unicast: device:{} gid:{} gkey:{} nextid:{}", + deviceId, Integer.toHexString(l3groupId), + l3groupkey, nextId); + } - // store l2groupkey with the groupChainElem for the l3group that depends on it - pendingGroups.put(l2groupkey, gce); + // store l2groupkey with the groupChainElem for the outer-group that depends on it + GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1); + Set gceSet = Collections.newSetFromMap( + new ConcurrentHashMap()); + gceSet.add(gce); + Set retval = pendingGroups.putIfAbsent(l2groupkey, gceSet); + if (retval != null) { + retval.add(gce); + } - // store l3groupkey with the ofdpaGroupChain for the nextObjective that depends on it - pendingNextObjectives.put(l3groupkey, ofdpaGrp); + // create group description for the inner l2interfacegroup + GroupBucket l2interfaceGroupBucket = + DefaultGroupBucket.createIndirectGroupBucket(innerTtb.build()); + GroupDescription l2groupDescription = + new DefaultGroupDescription( + deviceId, + GroupDescription.Type.INDIRECT, + new GroupBuckets(Collections.singletonList( + l2interfaceGroupBucket)), + l2groupkey, + l2groupId, + appId); + log.debug("Trying L2Interface: device:{} gid:{} gkey:{} nextId:{}", + deviceId, Integer.toHexString(l2groupId), + l2groupkey, nextId); + return new GroupInfo(l2groupDescription, outerGrpDesc); - // create group description for the ofdpa l2interfacegroup and send to groupservice - GroupBucket bucket = - DefaultGroupBucket.createIndirectGroupBucket(l2itt.build()); - GroupDescription groupDescription = new DefaultGroupDescription(deviceId, - GroupDescription.Type.INDIRECT, - new GroupBuckets(Collections.singletonList(bucket)), - l2groupkey, - l2groupId, - nextObj.appId()); - groupService.addGroup(groupDescription); } /** * As per the OFDPA 2.0 TTP, packets are sent out of ports by using - * a chain of groups. The Next Objective passed in by the application + * a chain of groups. The broadcast Next Objective passed in by the application * has to be broken up into a group chain comprising of an * L2 Flood group whose buckets point to L2 Interface groups. * @@ -866,9 +1048,9 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline Collection buckets = nextObj.next(); // each treatment is converted to an L2 interface group - int indicator = 0; VlanId vlanid = null; - List groupInfoCollection = new ArrayList<>(); + List l2interfaceGroupDescs = new ArrayList<>(); + List> allGroupKeys = new ArrayList<>(); for (TrafficTreatment treatment : buckets) { TrafficTreatment.Builder newTreatment = DefaultTrafficTreatment.builder(); PortNumber portNum = null; @@ -907,83 +1089,284 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline } } - // assemble info for all l2 interface groups - indicator += GROUP1MASK; - int l2gk = nextObj.id() | indicator; + // assemble info for l2 interface group + int l2gk = 0x0ffffff & (deviceId.hashCode() << 8 | (int) portNum.toLong()); final GroupKey l2groupkey = new DefaultGroupKey(appKryo.serialize(l2gk)); Integer l2groupId = L2INTERFACEMASK | (vlanid.toShort() << 16) | (int) portNum.toLong(); - GroupBucket newbucket = + GroupBucket l2interfaceGroupBucket = DefaultGroupBucket.createIndirectGroupBucket(newTreatment.build()); + GroupDescription l2interfaceGroupDescription = + new DefaultGroupDescription( + deviceId, + GroupDescription.Type.INDIRECT, + new GroupBuckets(Collections.singletonList( + l2interfaceGroupBucket)), + l2groupkey, + l2groupId, + nextObj.appId()); + log.debug("Trying L2-Interface: device:{} gid:{} gkey:{} nextid:{}", + deviceId, Integer.toHexString(l2groupId), + l2groupkey, nextObj.id()); + + Deque gkeyChain = new ArrayDeque<>(); + gkeyChain.addFirst(l2groupkey); // store the info needed to create this group - groupInfoCollection.add(new GroupInfo(l2groupId, l2groupkey, newbucket)); + l2interfaceGroupDescs.add(l2interfaceGroupDescription); + allGroupKeys.add(gkeyChain); } // assemble info for l2 flood group - int l2floodgk = nextObj.id() | GROUP0MASK; - final GroupKey l2floodgroupkey = new DefaultGroupKey(appKryo.serialize(l2floodgk)); Integer l2floodgroupId = L2FLOODMASK | (vlanid.toShort() << 16) | nextObj.id(); - // collection of treatment with groupids of l2 interface groups - List floodtt = new ArrayList<>(); - for (GroupInfo gi : groupInfoCollection) { + int l2floodgk = L2FLOODMASK | nextObj.id() << 12; + final GroupKey l2floodgroupkey = new DefaultGroupKey(appKryo.serialize(l2floodgk)); + // collection of group buckets pointing to all the l2 interface groups + List l2floodBuckets = new ArrayList<>(); + for (GroupDescription l2intGrpDesc : l2interfaceGroupDescs) { TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder(); - ttb.group(new DefaultGroupId(gi.groupId)); - floodtt.add(ttb.build()); + ttb.group(new DefaultGroupId(l2intGrpDesc.givenGroupId())); + GroupBucket abucket = DefaultGroupBucket.createAllGroupBucket(ttb.build()); + l2floodBuckets.add(abucket); } - GroupChainElem gce = new GroupChainElem(l2floodgroupkey, l2floodgroupId, - GroupDescription.Type.ALL, - floodtt, - nextObj.appId(), - groupInfoCollection.size()); + // create the l2flood group-description to wait for all the + // l2interface groups to be processed + GroupDescription l2floodGroupDescription = + new DefaultGroupDescription( + deviceId, + GroupDescription.Type.ALL, + new GroupBuckets(l2floodBuckets), + l2floodgroupkey, + l2floodgroupId, + nextObj.appId()); + GroupChainElem gce = new GroupChainElem(l2floodGroupDescription, + l2interfaceGroupDescs.size()); + log.debug("Trying L2-Flood: device:{} gid:{} gkey:{} nextid:{}", + deviceId, Integer.toHexString(l2floodgroupId), + l2floodgroupkey, nextObj.id()); // create objects for local and distributed storage - List gkeys = new ArrayList(); - gkeys.add(l2floodgroupkey); // group0 in chain - OfdpaGroupChain ofdpaGrp = new OfdpaGroupChain(gkeys, nextObj); + allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l2floodgroupkey)); + OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj); // store l2floodgroupkey with the ofdpaGroupChain for the nextObjective // that depends on it pendingNextObjectives.put(l2floodgroupkey, ofdpaGrp); - for (GroupInfo gi : groupInfoCollection) { + for (GroupDescription l2intGrpDesc : l2interfaceGroupDescs) { // store all l2groupkeys with the groupChainElem for the l2floodgroup // that depends on it - pendingGroups.put(gi.groupKey, gce); + Set gceSet = Collections.newSetFromMap( + new ConcurrentHashMap()); + gceSet.add(gce); + Set retval = pendingGroups.putIfAbsent( + l2intGrpDesc.appCookie(), gceSet); + if (retval != null) { + retval.add(gce); + } // create and send groups for all l2 interface groups - GroupDescription groupDescription = - new DefaultGroupDescription( - deviceId, - GroupDescription.Type.INDIRECT, - new GroupBuckets(Collections.singletonList(gi.groupBucket)), - gi.groupKey, - gi.groupId, - nextObj.appId()); - groupService.addGroup(groupDescription); + groupService.addGroup(l2intGrpDesc); } } + /** + * Utility class for moving group information around. + * + */ private class GroupInfo { - private Integer groupId; - private GroupKey groupKey; - private GroupBucket groupBucket; - - GroupInfo(Integer groupId, GroupKey groupKey, GroupBucket groupBucket) { - this.groupBucket = groupBucket; - this.groupId = groupId; - this.groupKey = groupKey; + private GroupDescription innerGrpDesc; + private GroupDescription outerGrpDesc; + + GroupInfo(GroupDescription innerGrpDesc, GroupDescription outerGrpDesc) { + this.innerGrpDesc = innerGrpDesc; + this.outerGrpDesc = outerGrpDesc; } } + /** + * As per the OFDPA 2.0 TTP, packets are sent out of ports by using + * a chain of groups. The hashed Next Objective passed in by the application + * has to be broken up into a group chain comprising of an + * L3 ECMP group as the top level group. Buckets of this group can point + * to a variety of groups in a group chain, depending on the whether + * MPLS labels are being pushed or not. + *

+ * NOTE: We do not create MPLS ECMP groups as they are unimplemented in + * OF-DPA 2.0 (even though it is in the spec). Therefore we do not + * check the nextObjective meta. + * + * @param nextObj the nextObjective of type HASHED + */ private void processHashedNextObjective(NextObjective nextObj) { - // TODO Auto-generated method stub + // break up hashed next objective to multiple groups + Collection buckets = nextObj.next(); + + // storage for all group keys in the chain of groups created + List> allGroupKeys = new ArrayList<>(); + List unsentGroups = new ArrayList<>(); + for (TrafficTreatment bucket : buckets) { + //figure out how many labels are pushed in each bucket + int labelsPushed = 0; + MplsLabel innermostLabel = null; + for (Instruction ins : bucket.allInstructions()) { + if (ins.type() == Instruction.Type.L2MODIFICATION) { + L2ModificationInstruction l2ins = (L2ModificationInstruction) ins; + if (l2ins.subtype() == L2SubType.MPLS_PUSH) { + labelsPushed++; + } + if (l2ins.subtype() == L2SubType.MPLS_LABEL) { + if (innermostLabel == null) { + innermostLabel = ((ModMplsLabelInstruction) l2ins).mplsLabel(); + } + } + } + } + + Deque gkeyChain = new ArrayDeque<>(); + // XXX we only deal with 0 and 1 label push right now + if (labelsPushed == 0) { + GroupInfo nolabelGroupInfo = createL2L3Chain(bucket, nextObj.id(), + nextObj.appId(), false, + nextObj.meta()); + if (nolabelGroupInfo == null) { + log.error("Could not process nextObj={} in dev:{}", + nextObj.id(), deviceId); + return; + } + gkeyChain.addFirst(nolabelGroupInfo.innerGrpDesc.appCookie()); + gkeyChain.addFirst(nolabelGroupInfo.outerGrpDesc.appCookie()); + + // we can't send the inner group description yet, as we have to + // create the dependent ECMP group first. So we store.. + unsentGroups.add(nolabelGroupInfo); + + } else if (labelsPushed == 1) { + GroupInfo onelabelGroupInfo = createL2L3Chain(bucket, nextObj.id(), + nextObj.appId(), true, + nextObj.meta()); + if (onelabelGroupInfo == null) { + log.error("Could not process nextObj={} in dev:{}", + nextObj.id(), deviceId); + return; + } + // we need to add another group to this chain - the L3VPN group + TrafficTreatment.Builder l3vpnTtb = DefaultTrafficTreatment.builder(); + l3vpnTtb.pushMpls() + .setMpls(innermostLabel) + .setMplsBos(true) + .copyTtlOut() + .group(new DefaultGroupId( + onelabelGroupInfo.outerGrpDesc.givenGroupId())); + GroupBucket l3vpnGrpBkt = + DefaultGroupBucket.createIndirectGroupBucket(l3vpnTtb.build()); + int l3vpngroupId = L3VPNMASK | l3vpnindex.incrementAndGet(); + int l3vpngk = L3VPNMASK | nextObj.id() << 12 | l3vpnindex.get(); + GroupKey l3vpngroupkey = new DefaultGroupKey(appKryo.serialize(l3vpngk)); + GroupDescription l3vpnGroupDesc = + new DefaultGroupDescription( + deviceId, + GroupDescription.Type.INDIRECT, + new GroupBuckets(Collections.singletonList( + l3vpnGrpBkt)), + l3vpngroupkey, + l3vpngroupId, + nextObj.appId()); + GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1); + Set gceSet = Collections.newSetFromMap( + new ConcurrentHashMap()); + gceSet.add(l3vpnGce); + Set retval = pendingGroups + .putIfAbsent(onelabelGroupInfo.outerGrpDesc.appCookie(), gceSet); + if (retval != null) { + retval.add(l3vpnGce); + } + + gkeyChain.addFirst(onelabelGroupInfo.innerGrpDesc.appCookie()); + gkeyChain.addFirst(onelabelGroupInfo.outerGrpDesc.appCookie()); + gkeyChain.addFirst(l3vpngroupkey); + + //now we can replace the outerGrpDesc with the one we just created + onelabelGroupInfo.outerGrpDesc = l3vpnGroupDesc; + + // we can't send the innermost group yet, as we have to create + // the dependent ECMP group first. So we store ... + unsentGroups.add(onelabelGroupInfo); + + log.debug("Trying L3VPN: device:{} gid:{} gkey:{} nextId:{}", + deviceId, Integer.toHexString(l3vpngroupId), + l3vpngroupkey, nextObj.id()); + + } else { + log.warn("Driver currently does not handle more than 1 MPLS " + + "labels. Not processing nextObjective {}", nextObj); + return; + } + + // all groups in this chain + allGroupKeys.add(gkeyChain); + } + + // now we can create the outermost L3 ECMP group + List l3ecmpGroupBuckets = new ArrayList<>(); + for (GroupInfo gi : unsentGroups) { + // create ECMP bucket to point to the outer group + TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder(); + ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId())); + GroupBucket sbucket = DefaultGroupBucket + .createSelectGroupBucket(ttb.build()); + l3ecmpGroupBuckets.add(sbucket); + } + int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12; + GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId)); + GroupDescription l3ecmpGroupDesc = + new DefaultGroupDescription( + deviceId, + GroupDescription.Type.SELECT, + new GroupBuckets(l3ecmpGroupBuckets), + l3ecmpGroupKey, + l3ecmpGroupId, + nextObj.appId()); + GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc, + l3ecmpGroupBuckets.size()); + + // create objects for local and distributed storage + allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey)); + OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj); + + // store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective + // that depends on it + pendingNextObjectives.put(l3ecmpGroupKey, ofdpaGrp); + + log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}", + deviceId, Integer.toHexString(l3ecmpGroupId), + l3ecmpGroupKey, nextObj.id()); + // finally we are ready to send the innermost groups + for (GroupInfo gi : unsentGroups) { + log.debug("Sending innermost group {} in group chain on device {} ", + Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId); + Set gceSet = Collections.newSetFromMap( + new ConcurrentHashMap()); + gceSet.add(l3ecmpGce); + Set retval = pendingGroups + .putIfAbsent(gi.outerGrpDesc.appCookie(), gceSet); + if (retval != null) { + retval.add(l3ecmpGce); + } + + groupService.addGroup(gi.innerGrpDesc); + } + } private void addBucketToGroup(NextObjective nextObjective) { // TODO Auto-generated method stub } + private void waitToAddBucketToGroup(NextObjective nextObjective) { + // TODO Auto-generated method stub + } + private void removeBucketFromGroup(NextObjective nextObjective) { // TODO Auto-generated method stub } @@ -1009,45 +1392,11 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline private void processGroupChain(GroupChainElem gce) { int waitOnGroups = gce.decrementAndGetGroupsWaitedOn(); if (waitOnGroups != 0) { - log.debug("GCE: {} waiting on {} groups. Not processing yet", - gce, waitOnGroups); + log.debug("GCE: {} not ready to be processed", gce); return; } - List buckets = new ArrayList<>(); - switch (gce.groupType) { - case INDIRECT: - GroupBucket ibucket = DefaultGroupBucket - .createIndirectGroupBucket(gce.bucketActions.iterator().next()); - buckets.add(ibucket); - break; - case ALL: - for (TrafficTreatment tt : gce.bucketActions) { - GroupBucket abucket = DefaultGroupBucket - .createAllGroupBucket(tt); - buckets.add(abucket); - } - break; - case SELECT: - for (TrafficTreatment tt : gce.bucketActions) { - GroupBucket sbucket = DefaultGroupBucket - .createSelectGroupBucket(tt); - buckets.add(sbucket); - } - break; - case FAILOVER: - default: - log.error("Unknown or unimplemented GroupChainElem {}", gce); - } - - if (buckets.size() > 0) { - GroupDescription groupDesc = new DefaultGroupDescription( - deviceId, gce.groupType, - new GroupBuckets(buckets), - gce.gkey, - gce.givenGroupId, - gce.appId); - groupService.addGroup(groupDesc); - } + log.debug("GCE: {} ready to be processed", gce); + groupService.addGroup(gce.groupDescription); } private class GroupChecker implements Runnable { @@ -1063,19 +1412,23 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline keys.stream().forEach(key -> { //first check for group chain - GroupChainElem gce = pendingGroups.remove(key); - if (gce != null) { - log.info("Group service processed group key {}. Processing next " - + "group in group chain with group key {}", - appKryo.deserialize(key.key()), - appKryo.deserialize(gce.gkey.key())); - processGroupChain(gce); + Set gceSet = pendingGroups.remove(key); + if (gceSet != null) { + for (GroupChainElem gce : gceSet) { + log.info("Group service processed group key {} in device {}. " + + "Processing next group in group chain with group id {}", + key, deviceId, + Integer.toHexString(gce.groupDescription.givenGroupId())); + processGroupChain(gce); + } } else { - OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key); - log.info("Group service processed group key {}. Done implementing " - + "next objective: {}", appKryo.deserialize(key.key()), - obj.nextObjective().id()); + OfdpaNextGroup obj = pendingNextObjectives.getIfPresent(key); if (obj != null) { + log.info("Group service processed group key {} in device:{}. " + + "Done implementing next objective: {} <<-->> gid:{}", + key, deviceId, obj.nextObjective().id(), + Integer.toHexString(groupService.getGroup(deviceId, key) + .givenGroupId())); pass(obj.nextObjective()); pendingNextObjectives.invalidate(key); flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj); @@ -1088,23 +1441,27 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline private class InnerGroupListener implements GroupListener { @Override public void event(GroupEvent event) { - log.debug("received group event of type {}", event.type()); + log.trace("received group event of type {}", event.type()); if (event.type() == GroupEvent.Type.GROUP_ADDED) { GroupKey key = event.subject().appCookie(); // first check for group chain - GroupChainElem gce = pendingGroups.remove(key); - if (gce != null) { - log.info("group ADDED with group key {} .. " - + "Processing next group in group chain with group key {}", - appKryo.deserialize(key.key()), - appKryo.deserialize(gce.gkey.key())); - processGroupChain(gce); + Set gceSet = pendingGroups.remove(key); + if (gceSet != null) { + for (GroupChainElem gce : gceSet) { + log.info("group ADDED with group key {} .. " + + "Processing next group in group chain with group key {}", + key, + gce.groupDescription.appCookie()); + processGroupChain(gce); + } } else { - OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key); + OfdpaNextGroup obj = pendingNextObjectives.getIfPresent(key); if (obj != null) { - log.info("group ADDED with key {}.. Done implementing next " - + "objective: {}", - appKryo.deserialize(key.key()), obj.nextObjective().id()); + log.info("group ADDED with key {} in dev {}.. Done implementing next " + + "objective: {} <<-->> gid:{}", + key, deviceId, obj.nextObjective().id(), + Integer.toHexString(groupService.getGroup(deviceId, key) + .givenGroupId())); pass(obj.nextObjective()); pendingNextObjectives.invalidate(key); flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj); @@ -1115,30 +1472,35 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline } /** - * Represents a group-chain that implements a Next-Objective from - * the application. Includes information about the next objective Id, and the - * group keys for the groups in the group chain. The chain is expected to - * look like group0 --> group 1 --> outPort. Information about the groups - * themselves can be fetched from the Group Service using the group keys from - * objects instantiating this class. + * Represents an entire group-chain that implements a Next-Objective from + * the application. The objective is represented as a list of deques, where + * each deque can is a separate chain of groups. + *

+ * For example, an ECMP group with 3 buckets, where each bucket points to + * a group chain of L3 Unicast and L2 interface groups will look like this: + *

    + *
  • List[0] is a Deque of GroupKeyECMP(first)-GroupKeyL3(middle)-GroupKeyL2(last) + *
  • List[1] is a Deque of GroupKeyECMP(first)-GroupKeyL3(middle)-GroupKeyL2(last) + *
  • List[2] is a Deque of GroupKeyECMP(first)-GroupKeyL3(middle)-GroupKeyL2(last) + *
+ * where the first element of each deque is the same, representing the + * top level ECMP group, while every other element represents a unique groupKey. + *

+ * Also includes information about the next objective that + * resulted in this group-chain. * - * XXX Revisit this - since the forwarding objective only ever needs the - * groupkey of the top-level group in the group chain, why store a series - * of groupkeys. Also the group-chain list only works for 1-to-1 chaining, - * not for 1-to-many chaining. */ - private class OfdpaGroupChain implements NextGroup { + private class OfdpaNextGroup implements NextGroup { private final NextObjective nextObj; - private final List gkeys; + private final List> gkeys; - /** expected group chain: group0 --> group1 --> port. */ - public OfdpaGroupChain(List gkeys, NextObjective nextObj) { + public OfdpaNextGroup(List> gkeys, NextObjective nextObj) { this.gkeys = gkeys; this.nextObj = nextObj; } @SuppressWarnings("unused") - public List groupKeys() { + public List> groupKey() { return gkeys; } @@ -1161,22 +1523,11 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline * preceding groups in the group chain to be created. */ private class GroupChainElem { - private Collection bucketActions; - private Integer givenGroupId; - private GroupDescription.Type groupType; - private GroupKey gkey; - private ApplicationId appId; + private GroupDescription groupDescription; private AtomicInteger waitOnGroups; - GroupChainElem(GroupKey gkey, Integer givenGroupId, - GroupDescription.Type groupType, - Collection tr, ApplicationId appId, - int waitOnGroups) { - this.bucketActions = tr; - this.givenGroupId = givenGroupId; - this.groupType = groupType; - this.gkey = gkey; - this.appId = appId; + GroupChainElem(GroupDescription groupDescription, int waitOnGroups) { + this.groupDescription = groupDescription; this.waitOnGroups = new AtomicInteger(waitOnGroups); } @@ -1194,7 +1545,10 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline @Override public String toString() { - return Integer.toHexString(givenGroupId); + return (Integer.toHexString(groupDescription.givenGroupId()) + + " groupKey: " + groupDescription.appCookie() + + " waiting-on-groups: " + waitOnGroups.get() + + " device: " + deviceId); } } -- cgit 1.2.3-korg