diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java new file mode 100644 index 00000000..648119e5 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java @@ -0,0 +1,349 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.newresource.impl; + +import com.google.common.annotations.Beta; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.newresource.ResourceConsumer; +import org.onosproject.net.newresource.ResourcePath; +import org.onosproject.net.newresource.ResourceStore; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.TransactionContext; +import org.onosproject.store.service.TransactionException; +import org.onosproject.store.service.TransactionalMap; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Implementation of ResourceStore using TransactionalMap. + */ +@Component(immediate = true, enabled = false) +@Service +@Beta +public class ConsistentResourceStore implements ResourceStore { + private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class); + + private static final String CONSUMER_MAP = "onos-resource-consumers"; + private static final String CHILD_MAP = "onos-resource-children"; + private static final Serializer SERIALIZER = Serializer.using( + Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API)); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService service; + + private ConsistentMap<ResourcePath, ResourceConsumer> consumerMap; + private ConsistentMap<ResourcePath, List<ResourcePath>> childMap; + + @Activate + public void activate() { + consumerMap = service.<ResourcePath, ResourceConsumer>consistentMapBuilder() + .withName(CONSUMER_MAP) + .withSerializer(SERIALIZER) + .build(); + childMap = service.<ResourcePath, List<ResourcePath>>consistentMapBuilder() + .withName(CHILD_MAP) + .withSerializer(SERIALIZER) + .build(); + } + + @Override + public Optional<ResourceConsumer> getConsumer(ResourcePath resource) { + checkNotNull(resource); + + Versioned<ResourceConsumer> consumer = consumerMap.get(resource); + if (consumer == null) { + return Optional.empty(); + } + + return Optional.of(consumer.value()); + } + + @Override + public boolean register(List<ResourcePath> resources) { + checkNotNull(resources); + + TransactionContext tx = service.transactionContextBuilder().build(); + tx.begin(); + + try { + TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = + tx.getTransactionalMap(CHILD_MAP, SERIALIZER); + + Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream() + .filter(x -> x.parent().isPresent()) + .collect(Collectors.groupingBy(x -> x.parent().get())); + + for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) { + if (!isRegistered(childTxMap, entry.getKey())) { + return abortTransaction(tx); + } + + if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) { + return abortTransaction(tx); + } + } + + return commitTransaction(tx); + } catch (TransactionException e) { + log.error("Exception thrown, abort the transaction", e); + return abortTransaction(tx); + } + } + + @Override + public boolean unregister(List<ResourcePath> resources) { + checkNotNull(resources); + + TransactionContext tx = service.transactionContextBuilder().build(); + tx.begin(); + + try { + TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = + tx.getTransactionalMap(CHILD_MAP, SERIALIZER); + TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = + tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); + + Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream() + .filter(x -> x.parent().isPresent()) + .collect(Collectors.groupingBy(x -> x.parent().get())); + + // even if one of the resources is allocated to a consumer, + // all unregistrations are regarded as failure + for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) { + if (entry.getValue().stream().anyMatch(x -> consumerTxMap.get(x) != null)) { + return abortTransaction(tx); + } + + if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) { + return abortTransaction(tx); + } + } + + return commitTransaction(tx); + } catch (TransactionException e) { + log.error("Exception thrown, abort the transaction", e); + return abortTransaction(tx); + } + } + + @Override + public boolean allocate(List<ResourcePath> resources, ResourceConsumer consumer) { + checkNotNull(resources); + checkNotNull(consumer); + + TransactionContext tx = service.transactionContextBuilder().build(); + tx.begin(); + + try { + TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = + tx.getTransactionalMap(CHILD_MAP, SERIALIZER); + TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = + tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); + + for (ResourcePath resource: resources) { + if (!isRegistered(childTxMap, resource)) { + return abortTransaction(tx); + } + + ResourceConsumer oldValue = consumerTxMap.put(resource, consumer); + if (oldValue != null) { + return abortTransaction(tx); + } + } + + return commitTransaction(tx); + } catch (TransactionException e) { + log.error("Exception thrown, abort the transaction", e); + return abortTransaction(tx); + } + } + + @Override + public boolean release(List<ResourcePath> resources, List<ResourceConsumer> consumers) { + checkNotNull(resources); + checkNotNull(consumers); + checkArgument(resources.size() == consumers.size()); + + TransactionContext tx = service.transactionContextBuilder().build(); + tx.begin(); + + try { + TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = + tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); + Iterator<ResourcePath> resourceIte = resources.iterator(); + Iterator<ResourceConsumer> consumerIte = consumers.iterator(); + + while (resourceIte.hasNext() && consumerIte.hasNext()) { + ResourcePath resource = resourceIte.next(); + ResourceConsumer consumer = consumerIte.next(); + + // if this single release fails (because the resource is allocated to another consumer, + // the whole release fails + if (!consumerTxMap.remove(resource, consumer)) { + return abortTransaction(tx); + } + } + + return commitTransaction(tx); + } catch (TransactionException e) { + log.error("Exception thrown, abort the transaction", e); + return abortTransaction(tx); + } + } + + @Override + public Collection<ResourcePath> getResources(ResourceConsumer consumer) { + checkNotNull(consumer); + + // NOTE: getting all entries may become performance bottleneck + // TODO: revisit for better backend data structure + return consumerMap.entrySet().stream() + .filter(x -> x.getValue().value().equals(consumer)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + @Override + public <T> Collection<ResourcePath> getAllocatedResources(ResourcePath parent, Class<T> cls) { + checkNotNull(parent); + checkNotNull(cls); + + Versioned<List<ResourcePath>> children = childMap.get(parent); + if (children == null) { + return Collections.emptyList(); + } + + return children.value().stream() + .filter(x -> x.lastComponent().getClass().equals(cls)) + .filter(consumerMap::containsKey) + .collect(Collectors.toList()); + } + + /** + * Abort the transaction. + * + * @param tx transaction context + * @return always false + */ + private boolean abortTransaction(TransactionContext tx) { + tx.abort(); + return false; + } + + /** + * Commit the transaction. + * + * @param tx transaction context + * @return always true + */ + private boolean commitTransaction(TransactionContext tx) { + tx.commit(); + return true; + } + + /** + * Appends the values to the existing values associated with the specified key. + * If the map already has all the given values, appending will not happen. + * + * @param map map holding multiple values for a key + * @param key key specifying values + * @param values values to be appended + * @param <K> type of the key + * @param <V> type of the element of the list + * @return true if the operation succeeds, false otherwise. + */ + private <K, V> boolean appendValues(TransactionalMap<K, List<V>> map, K key, List<V> values) { + List<V> oldValues = map.get(key); + if (oldValues == null) { + return map.replace(key, oldValues, new ArrayList<>(values)); + } + + LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues); + if (oldSet.containsAll(values)) { + // don't write to map because all values are already stored + return true; + } + + oldSet.addAll(values); + return map.replace(key, oldValues, new ArrayList<>(oldSet)); + } + + /** + * Removes teh values from the existing values associated with the specified key. + * If the map doesn't contain the given values, removal will not happen. + * + * @param map map holding multiple values for a key + * @param key key specifying values + * @param values values to be removed + * @param <K> type of the key + * @param <V> type of the element of the list + * @return true if the operation succeeds, false otherwise + */ + private <K, V> boolean removeValues(TransactionalMap<K, List<V>> map, K key, List<V> values) { + List<V> oldValues = map.get(key); + if (oldValues == null) { + return map.replace(key, oldValues, new ArrayList<>()); + } + + LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues); + if (values.stream().allMatch(x -> !oldSet.contains(x))) { + // don't write map because none of the values are stored + return true; + } + + oldSet.removeAll(values); + return map.replace(key, oldValues, new ArrayList<>(oldSet)); + } + + /** + * Checks if the specified resource is registered as a child of a resource in the map. + * + * @param map map storing parent - child relationship of resources + * @param resource resource to be checked + * @return true if the resource is registered, false otherwise. + */ + private boolean isRegistered(TransactionalMap<ResourcePath, List<ResourcePath>> map, ResourcePath resource) { + // root is always regarded to be registered + if (resource.isRoot()) { + return true; + } + + List<ResourcePath> value = map.get(resource.parent().get()); + return value != null && value.contains(resource); + } +} |