diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java | 1304 |
1 files changed, 1304 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java new file mode 100644 index 00000000..cf48dcb8 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java @@ -0,0 +1,1304 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.group.impl; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onlab.util.NewConcurrentHashMap; +import org.onosproject.cluster.ClusterService; +import org.onosproject.core.DefaultApplicationId; +import org.onosproject.core.DefaultGroupId; +import org.onosproject.core.GroupId; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.DeviceId; +import org.onosproject.net.MastershipRole; +import org.onosproject.net.PortNumber; +import org.onosproject.net.flow.DefaultTrafficTreatment; +import org.onosproject.net.flow.FlowRule; +import org.onosproject.net.flow.instructions.Instructions; +import org.onosproject.net.flow.instructions.L0ModificationInstruction; +import org.onosproject.net.flow.instructions.L2ModificationInstruction; +import org.onosproject.net.flow.instructions.L3ModificationInstruction; +import org.onosproject.net.group.DefaultGroup; +import org.onosproject.net.group.DefaultGroupBucket; +import org.onosproject.net.group.DefaultGroupDescription; +import org.onosproject.net.group.DefaultGroupKey; +import org.onosproject.net.group.Group; +import org.onosproject.net.group.Group.GroupState; +import org.onosproject.net.group.GroupBucket; +import org.onosproject.net.group.GroupBuckets; +import org.onosproject.net.group.GroupDescription; +import org.onosproject.net.group.GroupEvent; +import org.onosproject.net.group.GroupEvent.Type; +import org.onosproject.net.group.GroupKey; +import org.onosproject.net.group.GroupOperation; +import org.onosproject.net.group.GroupStore; +import org.onosproject.net.group.GroupStoreDelegate; +import org.onosproject.net.group.StoredGroupBucketEntry; +import org.onosproject.net.group.StoredGroupEntry; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.service.MultiValuedTimestamp; +import org.onosproject.store.serializers.DeviceIdSerializer; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.serializers.URISerializer; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapBuilder; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.StorageService; +import org.slf4j.Logger; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; +import static org.onlab.util.Tools.groupedThreads; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Manages inventory of group entries using trivial in-memory implementation. + */ +@Component(immediate = true) +@Service +public class DistributedGroupStore + extends AbstractStore<GroupEvent, GroupStoreDelegate> + implements GroupStore { + + private final Logger log = getLogger(getClass()); + + private final int dummyId = 0xffffffff; + private final GroupId dummyGroupId = new DefaultGroupId(dummyId); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + // Per device group table with (device id + app cookie) as key + private EventuallyConsistentMap<GroupStoreKeyMapKey, + StoredGroupEntry> groupStoreEntriesByKey = null; + // Per device group table with (device id + group id) as key + private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>> + groupEntriesById = new ConcurrentHashMap<>(); + private EventuallyConsistentMap<GroupStoreKeyMapKey, + StoredGroupEntry> auditPendingReqQueue = null; + private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>> + extraneousGroupEntriesById = new ConcurrentHashMap<>(); + private ExecutorService messageHandlingExecutor; + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1; + + private final HashMap<DeviceId, Boolean> deviceAuditStatus = + new HashMap<DeviceId, Boolean>(); + + private final AtomicInteger groupIdGen = new AtomicInteger(); + + private KryoNamespace.Builder kryoBuilder = null; + + private final AtomicLong sequenceNumber = new AtomicLong(0); + + @Activate + public void activate() { + kryoBuilder = new KryoNamespace.Builder() + .register(DefaultGroup.class, + DefaultGroupBucket.class, + DefaultGroupDescription.class, + DefaultGroupKey.class, + GroupDescription.Type.class, + Group.GroupState.class, + GroupBuckets.class, + DefaultGroupId.class, + GroupStoreMessage.class, + GroupStoreMessage.Type.class, + UpdateType.class, + GroupStoreMessageSubjects.class, + MultiValuedTimestamp.class, + GroupStoreKeyMapKey.class, + GroupStoreIdMapKey.class, + GroupStoreMapKey.class + ) + .register(new URISerializer(), URI.class) + .register(new DeviceIdSerializer(), DeviceId.class) + .register(PortNumber.class) + .register(DefaultApplicationId.class) + .register(DefaultTrafficTreatment.class, + Instructions.DropInstruction.class, + Instructions.OutputInstruction.class, + Instructions.GroupInstruction.class, + Instructions.TableTypeTransition.class, + FlowRule.Type.class, + L0ModificationInstruction.class, + L0ModificationInstruction.L0SubType.class, + L0ModificationInstruction.ModLambdaInstruction.class, + L2ModificationInstruction.class, + L2ModificationInstruction.L2SubType.class, + L2ModificationInstruction.ModEtherInstruction.class, + L2ModificationInstruction.PushHeaderInstructions.class, + L2ModificationInstruction.ModVlanIdInstruction.class, + L2ModificationInstruction.ModVlanPcpInstruction.class, + L2ModificationInstruction.ModMplsLabelInstruction.class, + L2ModificationInstruction.ModMplsTtlInstruction.class, + L3ModificationInstruction.class, + L3ModificationInstruction.L3SubType.class, + L3ModificationInstruction.ModIPInstruction.class, + L3ModificationInstruction.ModIPv6FlowLabelInstruction.class, + L3ModificationInstruction.ModTtlInstruction.class, + org.onlab.packet.MplsLabel.class + ) + .register(org.onosproject.cluster.NodeId.class) + .register(KryoNamespaces.BASIC) + .register(KryoNamespaces.MISC); + + messageHandlingExecutor = Executors. + newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, + groupedThreads("onos/store/group", + "message-handlers")); + + clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, + kryoBuilder.build()::deserialize, + this::process, + messageHandlingExecutor); + + log.debug("Creating EC map groupstorekeymap"); + EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry> + keyMapBuilder = storageService.eventuallyConsistentMapBuilder(); + + groupStoreEntriesByKey = keyMapBuilder + .withName("groupstorekeymap") + .withSerializer(kryoBuilder) + .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(), + sequenceNumber.getAndIncrement())) + .build(); + groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener()); + log.debug("Current size of groupstorekeymap:{}", + groupStoreEntriesByKey.size()); + + log.debug("Creating EC map pendinggroupkeymap"); + EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry> + auditMapBuilder = storageService.eventuallyConsistentMapBuilder(); + + auditPendingReqQueue = auditMapBuilder + .withName("pendinggroupkeymap") + .withSerializer(kryoBuilder) + .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(), + sequenceNumber.getAndIncrement())) + .build(); + log.debug("Current size of pendinggroupkeymap:{}", + auditPendingReqQueue.size()); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + groupStoreEntriesByKey.destroy(); + auditPendingReqQueue.destroy(); + log.info("Stopped"); + } + + private static NewConcurrentHashMap<GroupId, Group> + lazyEmptyExtraneousGroupIdTable() { + return NewConcurrentHashMap.<GroupId, Group>ifNeeded(); + } + + private static NewConcurrentHashMap<GroupId, StoredGroupEntry> + lazyEmptyGroupIdTable() { + return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded(); + } + + /** + * Returns the group store eventual consistent key map. + * + * @return Map representing group key table. + */ + private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> + getGroupStoreKeyMap() { + return groupStoreEntriesByKey; + } + + /** + * Returns the group id table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing group key table of given device. + */ + private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) { + return createIfAbsentUnchecked(groupEntriesById, + deviceId, lazyEmptyGroupIdTable()); + } + + /** + * Returns the pending group request table. + * + * @return Map representing group key table. + */ + private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> + getPendingGroupKeyTable() { + return auditPendingReqQueue; + } + + /** + * Returns the extraneous group id table for specified device. + * + * @param deviceId identifier of the device + * @return Map representing group key table of given device. + */ + private ConcurrentMap<GroupId, Group> + getExtraneousGroupIdTable(DeviceId deviceId) { + return createIfAbsentUnchecked(extraneousGroupEntriesById, + deviceId, + lazyEmptyExtraneousGroupIdTable()); + } + + /** + * Returns the number of groups for the specified device in the store. + * + * @return number of groups for the specified device + */ + @Override + public int getGroupCount(DeviceId deviceId) { + return (getGroups(deviceId) != null) ? + Iterables.size(getGroups(deviceId)) : 0; + } + + /** + * Returns the groups associated with a device. + * + * @param deviceId the device ID + * + * @return the group entries + */ + @Override + public Iterable<Group> getGroups(DeviceId deviceId) { + // flatten and make iterator unmodifiable + log.debug("getGroups: for device {} total number of groups {}", + deviceId, getGroupStoreKeyMap().values().size()); + return FluentIterable.from(getGroupStoreKeyMap().values()) + .filter(input -> input.deviceId().equals(deviceId)) + .transform(input -> input); + } + + private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) { + // flatten and make iterator unmodifiable + log.debug("getGroups: for device {} total number of groups {}", + deviceId, getGroupStoreKeyMap().values().size()); + return FluentIterable.from(getGroupStoreKeyMap().values()) + .filter(input -> input.deviceId().equals(deviceId)); + } + + /** + * Returns the stored group entry. + * + * @param deviceId the device ID + * @param appCookie the group key + * + * @return a group associated with the key + */ + @Override + public Group getGroup(DeviceId deviceId, GroupKey appCookie) { + return getStoredGroupEntry(deviceId, appCookie); + } + + private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId, + GroupKey appCookie) { + return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId, + appCookie)); + } + + @Override + public Group getGroup(DeviceId deviceId, GroupId groupId) { + return getStoredGroupEntry(deviceId, groupId); + } + + private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId, + GroupId groupId) { + return getGroupIdTable(deviceId).get(groupId); + } + + private int getFreeGroupIdValue(DeviceId deviceId) { + int freeId = groupIdGen.incrementAndGet(); + + while (true) { + Group existing = getGroup(deviceId, new DefaultGroupId(freeId)); + if (existing == null) { + existing = ( + extraneousGroupEntriesById.get(deviceId) != null) ? + extraneousGroupEntriesById.get(deviceId). + get(new DefaultGroupId(freeId)) : + null; + } + if (existing != null) { + freeId = groupIdGen.incrementAndGet(); + } else { + break; + } + } + log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId); + return freeId; + } + + /** + * Stores a new group entry using the information from group description. + * + * @param groupDesc group description to be used to create group entry + */ + @Override + public void storeGroupDescription(GroupDescription groupDesc) { + log.debug("In storeGroupDescription"); + // Check if a group is existing with the same key + if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) { + log.warn("Group already exists with the same key {}", + groupDesc.appCookie()); + return; + } + + // Check if group to be created by a remote instance + if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) { + log.debug("storeGroupDescription: Device {} local role is not MASTER", + groupDesc.deviceId()); + if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) { + log.error("No Master for device {}..." + + "Can not perform add group operation", + groupDesc.deviceId()); + //TODO: Send Group operation failure event + return; + } + GroupStoreMessage groupOp = GroupStoreMessage. + createGroupAddRequestMsg(groupDesc.deviceId(), + groupDesc); + + clusterCommunicator.unicast(groupOp, + GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, + m -> kryoBuilder.build().serialize(m), + mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> { + if (error != null) { + log.warn("Failed to send request to master: {} to {}", + groupOp, + mastershipService.getMasterFor(groupDesc.deviceId())); + //TODO: Send Group operation failure event + } else { + log.debug("Sent Group operation request for device {} " + + "to remote MASTER {}", + groupDesc.deviceId(), + mastershipService.getMasterFor(groupDesc.deviceId())); + } + }); + return; + } + + log.debug("Store group for device {} is getting handled locally", + groupDesc.deviceId()); + storeGroupDescriptionInternal(groupDesc); + } + + private Group getMatchingExtraneousGroupbyId(DeviceId deviceId, Integer groupId) { + ConcurrentMap<GroupId, Group> extraneousMap = + extraneousGroupEntriesById.get(deviceId); + if (extraneousMap == null) { + return null; + } + return extraneousMap.get(new DefaultGroupId(groupId)); + } + + private Group getMatchingExtraneousGroupbyBuckets(DeviceId deviceId, + GroupBuckets buckets) { + ConcurrentMap<GroupId, Group> extraneousMap = + extraneousGroupEntriesById.get(deviceId); + if (extraneousMap == null) { + return null; + } + + for (Group extraneousGroup:extraneousMap.values()) { + if (extraneousGroup.buckets().equals(buckets)) { + return extraneousGroup; + } + } + return null; + } + + private void storeGroupDescriptionInternal(GroupDescription groupDesc) { + // Check if a group is existing with the same key + if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) { + return; + } + + if (deviceAuditStatus.get(groupDesc.deviceId()) == null) { + // Device group audit has not completed yet + // Add this group description to pending group key table + // Create a group entry object with Dummy Group ID + log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request", + groupDesc.deviceId()); + StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc); + group.setState(GroupState.WAITING_AUDIT_COMPLETE); + EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable = + getPendingGroupKeyTable(); + pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(), + groupDesc.appCookie()), + group); + return; + } + + Group matchingExtraneousGroup = null; + if (groupDesc.givenGroupId() != null) { + //Check if there is a extraneous group existing with the same Id + matchingExtraneousGroup = getMatchingExtraneousGroupbyId( + groupDesc.deviceId(), groupDesc.givenGroupId()); + if (matchingExtraneousGroup != null) { + log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {} for group id {}", + groupDesc.deviceId(), + groupDesc.givenGroupId()); + //Check if the group buckets matches with user provided buckets + if (matchingExtraneousGroup.buckets().equals(groupDesc.buckets())) { + //Group is already existing with the same buckets and Id + // Create a group entry object + log.debug("storeGroupDescriptionInternal: Buckets also matching in Device {} for group id {}", + groupDesc.deviceId(), + groupDesc.givenGroupId()); + StoredGroupEntry group = new DefaultGroup( + matchingExtraneousGroup.id(), groupDesc); + // Insert the newly created group entry into key and id maps + getGroupStoreKeyMap(). + put(new GroupStoreKeyMapKey(groupDesc.deviceId(), + groupDesc.appCookie()), group); + // Ensure it also inserted into group id based table to + // avoid any chances of duplication in group id generation + getGroupIdTable(groupDesc.deviceId()). + put(matchingExtraneousGroup.id(), group); + addOrUpdateGroupEntry(matchingExtraneousGroup); + removeExtraneousGroupEntry(matchingExtraneousGroup); + return; + } else { + //Group buckets are not matching. Update group + //with user provided buckets. + //TODO + log.debug("storeGroupDescriptionInternal: Buckets are not matching in Device {} for group id {}", + groupDesc.deviceId(), + groupDesc.givenGroupId()); + } + } + } else { + //Check if there is an extraneous group with user provided buckets + matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets( + groupDesc.deviceId(), groupDesc.buckets()); + if (matchingExtraneousGroup != null) { + //Group is already existing with the same buckets. + //So reuse this group. + log.debug("storeGroupDescriptionInternal: Matching extraneous group found in Device {}", + groupDesc.deviceId()); + //Create a group entry object + StoredGroupEntry group = new DefaultGroup( + matchingExtraneousGroup.id(), groupDesc); + // Insert the newly created group entry into key and id maps + getGroupStoreKeyMap(). + put(new GroupStoreKeyMapKey(groupDesc.deviceId(), + groupDesc.appCookie()), group); + // Ensure it also inserted into group id based table to + // avoid any chances of duplication in group id generation + getGroupIdTable(groupDesc.deviceId()). + put(matchingExtraneousGroup.id(), group); + addOrUpdateGroupEntry(matchingExtraneousGroup); + removeExtraneousGroupEntry(matchingExtraneousGroup); + return; + } else { + //TODO: Check if there are any empty groups that can be used here + log.debug("storeGroupDescriptionInternal: No matching extraneous groups found in Device {}", + groupDesc.deviceId()); + } + } + + GroupId id = null; + if (groupDesc.givenGroupId() == null) { + // Get a new group identifier + id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId())); + } else { + id = new DefaultGroupId(groupDesc.givenGroupId()); + } + // Create a group entry object + StoredGroupEntry group = new DefaultGroup(id, groupDesc); + // Insert the newly created group entry into key and id maps + getGroupStoreKeyMap(). + put(new GroupStoreKeyMapKey(groupDesc.deviceId(), + groupDesc.appCookie()), group); + // Ensure it also inserted into group id based table to + // avoid any chances of duplication in group id generation + getGroupIdTable(groupDesc.deviceId()). + put(id, group); + log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}", + id, + groupDesc.deviceId()); + notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED, + group)); + } + + /** + * Updates the existing group entry with the information + * from group description. + * + * @param deviceId the device ID + * @param oldAppCookie the current group key + * @param type update type + * @param newBuckets group buckets for updates + * @param newAppCookie optional new group key + */ + @Override + public void updateGroupDescription(DeviceId deviceId, + GroupKey oldAppCookie, + UpdateType type, + GroupBuckets newBuckets, + GroupKey newAppCookie) { + // Check if group update to be done by a remote instance + if (mastershipService.getMasterFor(deviceId) != null && + mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) { + log.debug("updateGroupDescription: Device {} local role is not MASTER", + deviceId); + if (mastershipService.getMasterFor(deviceId) == null) { + log.error("No Master for device {}..." + + "Can not perform update group operation", + deviceId); + //TODO: Send Group operation failure event + return; + } + GroupStoreMessage groupOp = GroupStoreMessage. + createGroupUpdateRequestMsg(deviceId, + oldAppCookie, + type, + newBuckets, + newAppCookie); + + clusterCommunicator.unicast(groupOp, + GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, + m -> kryoBuilder.build().serialize(m), + mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> { + if (error != null) { + log.warn("Failed to send request to master: {} to {}", + groupOp, + mastershipService.getMasterFor(deviceId), error); + } + //TODO: Send Group operation failure event + }); + return; + } + log.debug("updateGroupDescription for device {} is getting handled locally", + deviceId); + updateGroupDescriptionInternal(deviceId, + oldAppCookie, + type, + newBuckets, + newAppCookie); + } + + private void updateGroupDescriptionInternal(DeviceId deviceId, + GroupKey oldAppCookie, + UpdateType type, + GroupBuckets newBuckets, + GroupKey newAppCookie) { + // Check if a group is existing with the provided key + Group oldGroup = getGroup(deviceId, oldAppCookie); + if (oldGroup == null) { + log.warn("updateGroupDescriptionInternal: Group not found...strange"); + return; + } + + List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup, + type, + newBuckets); + if (newBucketList != null) { + // Create a new group object from the old group + GroupBuckets updatedBuckets = new GroupBuckets(newBucketList); + GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie; + GroupDescription updatedGroupDesc = new DefaultGroupDescription( + oldGroup.deviceId(), + oldGroup.type(), + updatedBuckets, + newCookie, + oldGroup.givenGroupId(), + oldGroup.appId()); + StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(), + updatedGroupDesc); + log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE", + oldGroup.id(), + oldGroup.deviceId(), + oldGroup.state()); + newGroup.setState(GroupState.PENDING_UPDATE); + newGroup.setLife(oldGroup.life()); + newGroup.setPackets(oldGroup.packets()); + newGroup.setBytes(oldGroup.bytes()); + //Update the group entry in groupkey based map. + //Update to groupid based map will happen in the + //groupkey based map update listener + log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets", + type); + getGroupStoreKeyMap(). + put(new GroupStoreKeyMapKey(newGroup.deviceId(), + newGroup.appCookie()), newGroup); + notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup)); + } else { + log.warn("updateGroupDescriptionInternal with type {}: No " + + "change in the buckets in update", type); + } + } + + private List<GroupBucket> getUpdatedBucketList(Group oldGroup, + UpdateType type, + GroupBuckets buckets) { + GroupBuckets oldBuckets = oldGroup.buckets(); + List<GroupBucket> newBucketList = new ArrayList<GroupBucket>( + oldBuckets.buckets()); + boolean groupDescUpdated = false; + + if (type == UpdateType.ADD) { + // Check if the any of the new buckets are part of + // the old bucket list + for (GroupBucket addBucket:buckets.buckets()) { + if (!newBucketList.contains(addBucket)) { + newBucketList.add(addBucket); + groupDescUpdated = true; + } + } + } else if (type == UpdateType.REMOVE) { + // Check if the to be removed buckets are part of the + // old bucket list + for (GroupBucket removeBucket:buckets.buckets()) { + if (newBucketList.contains(removeBucket)) { + newBucketList.remove(removeBucket); + groupDescUpdated = true; + } + } + } + + if (groupDescUpdated) { + return newBucketList; + } else { + return null; + } + } + + /** + * Triggers deleting the existing group entry. + * + * @param deviceId the device ID + * @param appCookie the group key + */ + @Override + public void deleteGroupDescription(DeviceId deviceId, + GroupKey appCookie) { + // Check if group to be deleted by a remote instance + if (mastershipService. + getLocalRole(deviceId) != MastershipRole.MASTER) { + log.debug("deleteGroupDescription: Device {} local role is not MASTER", + deviceId); + if (mastershipService.getMasterFor(deviceId) == null) { + log.error("No Master for device {}..." + + "Can not perform delete group operation", + deviceId); + //TODO: Send Group operation failure event + return; + } + GroupStoreMessage groupOp = GroupStoreMessage. + createGroupDeleteRequestMsg(deviceId, + appCookie); + + clusterCommunicator.unicast(groupOp, + GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, + m -> kryoBuilder.build().serialize(m), + mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> { + if (error != null) { + log.warn("Failed to send request to master: {} to {}", + groupOp, + mastershipService.getMasterFor(deviceId), error); + } + //TODO: Send Group operation failure event + }); + return; + } + log.debug("deleteGroupDescription in device {} is getting handled locally", + deviceId); + deleteGroupDescriptionInternal(deviceId, appCookie); + } + + private void deleteGroupDescriptionInternal(DeviceId deviceId, + GroupKey appCookie) { + // Check if a group is existing with the provided key + StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie); + if (existing == null) { + return; + } + + log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE", + existing.id(), + existing.deviceId(), + existing.state()); + synchronized (existing) { + existing.setState(GroupState.PENDING_DELETE); + } + log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED", + deviceId); + notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing)); + } + + /** + * Stores a new group entry, or updates an existing entry. + * + * @param group group entry + */ + @Override + public void addOrUpdateGroupEntry(Group group) { + // check if this new entry is an update to an existing entry + StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(), + group.id()); + GroupEvent event = null; + + if (existing != null) { + log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}", + group.id(), + group.deviceId()); + synchronized (existing) { + for (GroupBucket bucket:group.buckets().buckets()) { + Optional<GroupBucket> matchingBucket = + existing.buckets().buckets() + .stream() + .filter((existingBucket)->(existingBucket.equals(bucket))) + .findFirst(); + if (matchingBucket.isPresent()) { + ((StoredGroupBucketEntry) matchingBucket. + get()).setPackets(bucket.packets()); + ((StoredGroupBucketEntry) matchingBucket. + get()).setBytes(bucket.bytes()); + } else { + log.warn("addOrUpdateGroupEntry: No matching " + + "buckets to update stats"); + } + } + existing.setLife(group.life()); + existing.setPackets(group.packets()); + existing.setBytes(group.bytes()); + if ((existing.state() == GroupState.PENDING_ADD) || + (existing.state() == GroupState.PENDING_ADD_RETRY)) { + log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED", + existing.id(), + existing.deviceId(), + existing.state()); + existing.setState(GroupState.ADDED); + existing.setIsGroupStateAddedFirstTime(true); + event = new GroupEvent(Type.GROUP_ADDED, existing); + } else { + log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED", + existing.id(), + existing.deviceId(), + GroupState.PENDING_UPDATE); + existing.setState(GroupState.ADDED); + existing.setIsGroupStateAddedFirstTime(false); + event = new GroupEvent(Type.GROUP_UPDATED, existing); + } + //Re-PUT map entries to trigger map update events + getGroupStoreKeyMap(). + put(new GroupStoreKeyMapKey(existing.deviceId(), + existing.appCookie()), existing); + } + } else { + log.warn("addOrUpdateGroupEntry: Group update " + + "happening for a non-existing entry in the map"); + } + + if (event != null) { + notifyDelegate(event); + } + } + + /** + * Removes the group entry from store. + * + * @param group group entry + */ + @Override + public void removeGroupEntry(Group group) { + StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(), + group.id()); + + if (existing != null) { + log.debug("removeGroupEntry: removing group entry {} in device {}", + group.id(), + group.deviceId()); + //Removal from groupid based map will happen in the + //map update listener + getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(), + existing.appCookie())); + notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing)); + } else { + log.warn("removeGroupEntry for {} in device{} is " + + "not existing in our maps", + group.id(), + group.deviceId()); + } + } + + @Override + public void deviceInitialAuditCompleted(DeviceId deviceId, + boolean completed) { + synchronized (deviceAuditStatus) { + if (completed) { + log.debug("AUDIT completed for device {}", + deviceId); + deviceAuditStatus.put(deviceId, true); + // Execute all pending group requests + List<StoredGroupEntry> pendingGroupRequests = + getPendingGroupKeyTable().values() + .stream() + .filter(g-> g.deviceId().equals(deviceId)) + .collect(Collectors.toList()); + log.debug("processing pending group add requests for device {} and number of pending requests {}", + deviceId, + pendingGroupRequests.size()); + for (Group group:pendingGroupRequests) { + GroupDescription tmp = new DefaultGroupDescription( + group.deviceId(), + group.type(), + group.buckets(), + group.appCookie(), + group.givenGroupId(), + group.appId()); + storeGroupDescriptionInternal(tmp); + getPendingGroupKeyTable(). + remove(new GroupStoreKeyMapKey(deviceId, group.appCookie())); + } + } else { + Boolean audited = deviceAuditStatus.get(deviceId); + if (audited != null && audited) { + log.debug("Clearing AUDIT status for device {}", deviceId); + deviceAuditStatus.put(deviceId, false); + } + } + } + } + + @Override + public boolean deviceInitialAuditStatus(DeviceId deviceId) { + synchronized (deviceAuditStatus) { + Boolean audited = deviceAuditStatus.get(deviceId); + return audited != null && audited; + } + } + + @Override + public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) { + + StoredGroupEntry existing = getStoredGroupEntry(deviceId, + operation.groupId()); + + if (existing == null) { + log.warn("No group entry with ID {} found ", operation.groupId()); + return; + } + + log.warn("groupOperationFailed: group operation {} failed" + + "for group {} in device {}", + operation.opType(), + existing.id(), + existing.deviceId()); + switch (operation.opType()) { + case ADD: + if (existing.state() == GroupState.PENDING_ADD) { + //TODO: Need to add support for passing the group + //operation failure reason from group provider. + //If the error type is anything other than GROUP_EXISTS, + //then the GROUP_ADD_FAILED event should be raised even + //in PENDING_ADD_RETRY state also. + notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing)); + log.warn("groupOperationFailed: cleaningup " + + "group {} from store in device {}....", + existing.id(), + existing.deviceId()); + //Removal from groupid based map will happen in the + //map update listener + getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(), + existing.appCookie())); + } + break; + case MODIFY: + notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing)); + break; + case DELETE: + notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing)); + break; + default: + log.warn("Unknown group operation type {}", operation.opType()); + } + } + + @Override + public void addOrUpdateExtraneousGroupEntry(Group group) { + log.debug("add/update extraneous group entry {} in device {}", + group.id(), + group.deviceId()); + ConcurrentMap<GroupId, Group> extraneousIdTable = + getExtraneousGroupIdTable(group.deviceId()); + extraneousIdTable.put(group.id(), group); + // Don't remove the extraneous groups, instead re-use it when + // a group request comes with the same set of buckets + } + + @Override + public void removeExtraneousGroupEntry(Group group) { + log.debug("remove extraneous group entry {} of device {} from store", + group.id(), + group.deviceId()); + ConcurrentMap<GroupId, Group> extraneousIdTable = + getExtraneousGroupIdTable(group.deviceId()); + extraneousIdTable.remove(group.id()); + } + + @Override + public Iterable<Group> getExtraneousGroups(DeviceId deviceId) { + // flatten and make iterator unmodifiable + return FluentIterable.from( + getExtraneousGroupIdTable(deviceId).values()); + } + + /** + * Map handler to receive any events when the group key map is updated. + */ + private class GroupStoreKeyMapListener implements + EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> { + + @Override + public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey, + StoredGroupEntry> mapEvent) { + GroupEvent groupEvent = null; + GroupStoreKeyMapKey key = mapEvent.key(); + StoredGroupEntry group = mapEvent.value(); + if ((key == null) && (group == null)) { + log.error("GroupStoreKeyMapListener: Received " + + "event {} with null entry", mapEvent.type()); + return; + } else if (group == null) { + group = getGroupIdTable(key.deviceId()).values() + .stream() + .filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie))) + .findFirst().get(); + if (group == null) { + log.error("GroupStoreKeyMapListener: Received " + + "event {} with null entry... can not process", mapEvent.type()); + return; + } + } + log.trace("received groupid map event {} for id {} in device {}", + mapEvent.type(), + group.id(), + key.deviceId()); + if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) { + // Update the group ID table + getGroupIdTable(group.deviceId()).put(group.id(), group); + if (mapEvent.value().state() == Group.GroupState.ADDED) { + if (mapEvent.value().isGroupStateAddedFirstTime()) { + groupEvent = new GroupEvent(Type.GROUP_ADDED, + mapEvent.value()); + log.trace("Received first time GROUP_ADDED state update for id {} in device {}", + group.id(), + group.deviceId()); + } else { + groupEvent = new GroupEvent(Type.GROUP_UPDATED, + mapEvent.value()); + log.trace("Received following GROUP_ADDED state update for id {} in device {}", + group.id(), + group.deviceId()); + } + } + } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) { + groupEvent = new GroupEvent(Type.GROUP_REMOVED, group); + // Remove the entry from the group ID table + getGroupIdTable(group.deviceId()).remove(group.id(), group); + } + + if (groupEvent != null) { + notifyDelegate(groupEvent); + } + } + } + + private void process(GroupStoreMessage groupOp) { + log.debug("Received remote group operation {} request for device {}", + groupOp.type(), + groupOp.deviceId()); + if (!mastershipService.isLocalMaster(groupOp.deviceId())) { + log.warn("This node is not MASTER for device {}", groupOp.deviceId()); + return; + } + if (groupOp.type() == GroupStoreMessage.Type.ADD) { + storeGroupDescriptionInternal(groupOp.groupDesc()); + } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) { + updateGroupDescriptionInternal(groupOp.deviceId(), + groupOp.appCookie(), + groupOp.updateType(), + groupOp.updateBuckets(), + groupOp.newAppCookie()); + } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) { + deleteGroupDescriptionInternal(groupOp.deviceId(), + groupOp.appCookie()); + } + } + + /** + * Flattened map key to be used to store group entries. + */ + protected static class GroupStoreMapKey { + private final DeviceId deviceId; + + public GroupStoreMapKey(DeviceId deviceId) { + this.deviceId = deviceId; + } + + public DeviceId deviceId() { + return deviceId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof GroupStoreMapKey)) { + return false; + } + GroupStoreMapKey that = (GroupStoreMapKey) o; + return this.deviceId.equals(that.deviceId); + } + + @Override + public int hashCode() { + int result = 17; + + result = 31 * result + Objects.hash(this.deviceId); + + return result; + } + } + + protected static class GroupStoreKeyMapKey extends GroupStoreMapKey { + private final GroupKey appCookie; + public GroupStoreKeyMapKey(DeviceId deviceId, + GroupKey appCookie) { + super(deviceId); + this.appCookie = appCookie; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof GroupStoreKeyMapKey)) { + return false; + } + GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o; + return (super.equals(that) && + this.appCookie.equals(that.appCookie)); + } + + @Override + public int hashCode() { + int result = 17; + + result = 31 * result + super.hashCode() + Objects.hash(this.appCookie); + + return result; + } + } + + protected static class GroupStoreIdMapKey extends GroupStoreMapKey { + private final GroupId groupId; + public GroupStoreIdMapKey(DeviceId deviceId, + GroupId groupId) { + super(deviceId); + this.groupId = groupId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof GroupStoreIdMapKey)) { + return false; + } + GroupStoreIdMapKey that = (GroupStoreIdMapKey) o; + return (super.equals(that) && + this.groupId.equals(that.groupId)); + } + + @Override + public int hashCode() { + int result = 17; + + result = 31 * result + super.hashCode() + Objects.hash(this.groupId); + + return result; + } + } + + @Override + public void pushGroupMetrics(DeviceId deviceId, + Collection<Group> groupEntries) { + boolean deviceInitialAuditStatus = + deviceInitialAuditStatus(deviceId); + Set<Group> southboundGroupEntries = + Sets.newHashSet(groupEntries); + Set<StoredGroupEntry> storedGroupEntries = + Sets.newHashSet(getStoredGroups(deviceId)); + Set<Group> extraneousStoredEntries = + Sets.newHashSet(getExtraneousGroups(deviceId)); + + log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}", + southboundGroupEntries.size(), + deviceId); + for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) { + Group group = it.next(); + log.trace("Group {} in device {}", group, deviceId); + } + + log.trace("Displaying all ({}) stored group entries for device {}", + storedGroupEntries.size(), + deviceId); + for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator(); + it1.hasNext();) { + Group group = it1.next(); + log.trace("Stored Group {} for device {}", group, deviceId); + } + + for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) { + Group group = it2.next(); + if (storedGroupEntries.remove(group)) { + // we both have the group, let's update some info then. + log.trace("Group AUDIT: group {} exists in both planes for device {}", + group.id(), deviceId); + groupAdded(group); + it2.remove(); + } + } + for (Group group : southboundGroupEntries) { + if (getGroup(group.deviceId(), group.id()) != null) { + // There is a group existing with the same id + // It is possible that group update is + // in progress while we got a stale info from switch + if (!storedGroupEntries.remove(getGroup( + group.deviceId(), group.id()))) { + log.warn("Group AUDIT: Inconsistent state:" + + "Group exists in ID based table while " + + "not present in key based table"); + } + } else { + // there are groups in the switch that aren't in the store + log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}", + group.id(), deviceId); + extraneousStoredEntries.remove(group); + extraneousGroup(group); + } + } + for (Group group : storedGroupEntries) { + // there are groups in the store that aren't in the switch + log.debug("Group AUDIT: group {} missing in data plane for device {}", + group.id(), deviceId); + groupMissing(group); + } + for (Group group : extraneousStoredEntries) { + // there are groups in the extraneous store that + // aren't in the switch + log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}", + group.id(), deviceId); + removeExtraneousGroupEntry(group); + } + + if (!deviceInitialAuditStatus) { + log.debug("Group AUDIT: Setting device {} initial AUDIT completed", + deviceId); + deviceInitialAuditCompleted(deviceId, true); + } + } + + private void groupMissing(Group group) { + switch (group.state()) { + case PENDING_DELETE: + log.debug("Group {} delete confirmation from device {}", + group, group.deviceId()); + removeGroupEntry(group); + break; + case ADDED: + case PENDING_ADD: + case PENDING_ADD_RETRY: + case PENDING_UPDATE: + log.debug("Group {} is in store but not on device {}", + group, group.deviceId()); + StoredGroupEntry existing = + getStoredGroupEntry(group.deviceId(), group.id()); + log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY", + existing.id(), + existing.deviceId(), + existing.state()); + existing.setState(Group.GroupState.PENDING_ADD_RETRY); + //Re-PUT map entries to trigger map update events + getGroupStoreKeyMap(). + put(new GroupStoreKeyMapKey(existing.deviceId(), + existing.appCookie()), existing); + notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED, + group)); + break; + default: + log.debug("Group {} has not been installed.", group); + break; + } + } + + private void extraneousGroup(Group group) { + log.debug("Group {} is on device {} but not in store.", + group, group.deviceId()); + addOrUpdateExtraneousGroupEntry(group); + } + + private void groupAdded(Group group) { + log.trace("Group {} Added or Updated in device {}", + group, group.deviceId()); + addOrUpdateGroupEntry(group); + } +} |