aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java503
1 files changed, 503 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java
new file mode 100644
index 00000000..ce25f868
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java
@@ -0,0 +1,503 @@
+package org.onosproject.store.resource.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.onlab.util.Bandwidth;
+import org.onosproject.net.OmsPort;
+import org.onosproject.net.device.DeviceService;
+import org.slf4j.Logger;
+import org.onlab.util.PositionalParameterStringFormatter;
+import org.onosproject.net.Link;
+import org.onosproject.net.LinkKey;
+import org.onosproject.net.Port;
+import org.onosproject.net.intent.IntentId;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.net.resource.link.BandwidthResource;
+import org.onosproject.net.resource.link.BandwidthResourceAllocation;
+import org.onosproject.net.resource.link.LambdaResource;
+import org.onosproject.net.resource.link.LambdaResourceAllocation;
+import org.onosproject.net.resource.link.LinkResourceAllocations;
+import org.onosproject.net.resource.link.LinkResourceEvent;
+import org.onosproject.net.resource.link.LinkResourceStore;
+import org.onosproject.net.resource.link.LinkResourceStoreDelegate;
+import org.onosproject.net.resource.link.MplsLabel;
+import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
+import org.onosproject.net.resource.ResourceAllocation;
+import org.onosproject.net.resource.ResourceAllocationException;
+import org.onosproject.net.resource.ResourceType;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TransactionContext;
+import org.onosproject.store.service.TransactionException;
+import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.AnnotationKeys.BANDWIDTH;
+
+/**
+ * Store that manages link resources using Copycat-backed TransactionalMaps.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class ConsistentLinkResourceStore extends
+ AbstractStore<LinkResourceEvent, LinkResourceStoreDelegate> implements
+ LinkResourceStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final BandwidthResource DEFAULT_BANDWIDTH = new BandwidthResource(Bandwidth.mbps(1_000));
+ private static final BandwidthResource EMPTY_BW = new BandwidthResource(Bandwidth.bps(0));
+
+ // Smallest non-reserved MPLS label
+ private static final int MIN_UNRESERVED_LABEL = 0x10;
+ // Max non-reserved MPLS label = 239
+ private static final int MAX_UNRESERVED_LABEL = 0xEF;
+
+ // table to store current allocations
+ /** LinkKey -> List<LinkResourceAllocations>. */
+ private static final String LINK_RESOURCE_ALLOCATIONS = "LinkAllocations";
+
+ /** IntentId -> LinkResourceAllocations. */
+ private static final String INTENT_ALLOCATIONS = "LinkIntentAllocations";
+
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
+
+ // for reading committed values.
+ private ConsistentMap<IntentId, LinkResourceAllocations> intentAllocMap;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Activate
+ public void activate() {
+ intentAllocMap = storageService.<IntentId, LinkResourceAllocations>consistentMapBuilder()
+ .withName(INTENT_ALLOCATIONS)
+ .withSerializer(SERIALIZER)
+ .build();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ private TransactionalMap<IntentId, LinkResourceAllocations> getIntentAllocs(TransactionContext tx) {
+ return tx.getTransactionalMap(INTENT_ALLOCATIONS, SERIALIZER);
+ }
+
+ private TransactionalMap<LinkKey, List<LinkResourceAllocations>> getLinkAllocs(TransactionContext tx) {
+ return tx.getTransactionalMap(LINK_RESOURCE_ALLOCATIONS, SERIALIZER);
+ }
+
+ private TransactionContext getTxContext() {
+ return storageService.transactionContextBuilder().build();
+ }
+
+ private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) {
+ if (type == ResourceType.BANDWIDTH) {
+ return ImmutableSet.of(getBandwidthResourceCapacity(link));
+ }
+ if (type == ResourceType.LAMBDA) {
+ return getLambdaResourceCapacity(link);
+ }
+ if (type == ResourceType.MPLS_LABEL) {
+ return getMplsResourceCapacity();
+ }
+ return ImmutableSet.of();
+ }
+
+ private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) {
+ Set<LambdaResourceAllocation> allocations = new HashSet<>();
+ Port port = deviceService.getPort(link.src().deviceId(), link.src().port());
+ if (port instanceof OmsPort) {
+ OmsPort omsPort = (OmsPort) port;
+
+ // Assume fixed grid for now
+ for (int i = 0; i < omsPort.totalChannels(); i++) {
+ allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i)));
+ }
+ }
+ return allocations;
+ }
+
+ private BandwidthResourceAllocation getBandwidthResourceCapacity(Link link) {
+
+ // if Link annotation exist, use them
+ // if all fails, use DEFAULT_BANDWIDTH
+ BandwidthResource bandwidth = null;
+ String strBw = link.annotations().value(BANDWIDTH);
+ if (strBw != null) {
+ try {
+ bandwidth = new BandwidthResource(Bandwidth.mbps(Double.parseDouble(strBw)));
+ } catch (NumberFormatException e) {
+ // do nothings
+ bandwidth = null;
+ }
+ }
+
+ if (bandwidth == null) {
+ // fall back, use fixed default
+ bandwidth = DEFAULT_BANDWIDTH;
+ }
+ return new BandwidthResourceAllocation(bandwidth);
+ }
+
+ private Set<MplsLabelResourceAllocation> getMplsResourceCapacity() {
+ Set<MplsLabelResourceAllocation> allocations = new HashSet<>();
+ //Ignoring reserved labels of 0 through 15
+ for (int i = MIN_UNRESERVED_LABEL; i <= MAX_UNRESERVED_LABEL; i++) {
+ allocations.add(new MplsLabelResourceAllocation(MplsLabel
+ .valueOf(i)));
+
+ }
+ return allocations;
+ }
+
+ private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) {
+ Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>();
+ for (ResourceType type : ResourceType.values()) {
+ Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link);
+ if (cap != null) {
+ caps.put(type, cap);
+ }
+ }
+ return caps;
+ }
+
+ @Override
+ public Set<ResourceAllocation> getFreeResources(Link link) {
+ TransactionContext tx = getTxContext();
+
+ tx.begin();
+ try {
+ Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(tx, link);
+ Set<ResourceAllocation> allFree = new HashSet<>();
+ freeResources.values().forEach(allFree::addAll);
+ return allFree;
+ } finally {
+ tx.abort();
+ }
+ }
+
+ private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(TransactionContext tx, Link link) {
+ checkNotNull(tx);
+ checkNotNull(link);
+
+ Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>();
+ final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link);
+ final Iterable<LinkResourceAllocations> allocations = getAllocations(tx, link);
+
+ for (ResourceType type : ResourceType.values()) {
+ // there should be class/category of resources
+
+ switch (type) {
+ case BANDWIDTH:
+ Set<? extends ResourceAllocation> bw = caps.get(type);
+ if (bw == null || bw.isEmpty()) {
+ bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW));
+ }
+
+ BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next();
+ double freeBw = cap.bandwidth().toDouble();
+
+ // enumerate current allocations, subtracting resources
+ for (LinkResourceAllocations alloc : allocations) {
+ Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
+ for (ResourceAllocation a : types) {
+ if (a instanceof BandwidthResourceAllocation) {
+ BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a;
+ freeBw -= bwA.bandwidth().toDouble();
+ }
+ }
+ }
+
+ free.put(type, Sets.newHashSet(
+ new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(freeBw)))));
+ break;
+ case LAMBDA:
+ Set<? extends ResourceAllocation> lmd = caps.get(type);
+ if (lmd == null || lmd.isEmpty()) {
+ // nothing left
+ break;
+ }
+ Set<LambdaResourceAllocation> freeL = new HashSet<>();
+ for (ResourceAllocation r : lmd) {
+ if (r instanceof LambdaResourceAllocation) {
+ freeL.add((LambdaResourceAllocation) r);
+ }
+ }
+
+ // enumerate current allocations, removing resources
+ for (LinkResourceAllocations alloc : allocations) {
+ Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
+ for (ResourceAllocation a : types) {
+ if (a instanceof LambdaResourceAllocation) {
+ freeL.remove(a);
+ }
+ }
+ }
+
+ free.put(type, freeL);
+ break;
+ case MPLS_LABEL:
+ Set<? extends ResourceAllocation> mpls = caps.get(type);
+ if (mpls == null || mpls.isEmpty()) {
+ // nothing left
+ break;
+ }
+ Set<MplsLabelResourceAllocation> freeLabel = new HashSet<>();
+ for (ResourceAllocation r : mpls) {
+ if (r instanceof MplsLabelResourceAllocation) {
+ freeLabel.add((MplsLabelResourceAllocation) r);
+ }
+ }
+
+ // enumerate current allocations, removing resources
+ for (LinkResourceAllocations alloc : allocations) {
+ Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
+ for (ResourceAllocation a : types) {
+ if (a instanceof MplsLabelResourceAllocation) {
+ freeLabel.remove(a);
+ }
+ }
+ }
+
+ free.put(type, freeLabel);
+ break;
+ default:
+ log.debug("unsupported ResourceType {}", type);
+ break;
+ }
+ }
+ return free;
+ }
+
+ @Override
+ public void allocateResources(LinkResourceAllocations allocations) {
+ checkNotNull(allocations);
+ TransactionContext tx = getTxContext();
+
+ tx.begin();
+ try {
+ TransactionalMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx);
+ intentAllocs.put(allocations.intentId(), allocations);
+ allocations.links().forEach(link -> allocateLinkResource(tx, link, allocations));
+ tx.commit();
+ } catch (Exception e) {
+ log.error("Exception thrown, rolling back", e);
+ tx.abort();
+ throw e;
+ }
+ }
+
+ private void allocateLinkResource(TransactionContext tx, Link link,
+ LinkResourceAllocations allocations) {
+ // requested resources
+ Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link);
+ Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(tx, link);
+ for (ResourceAllocation req : reqs) {
+ Set<? extends ResourceAllocation> avail = available.get(req.type());
+ if (req instanceof BandwidthResourceAllocation) {
+ // check if allocation should be accepted
+ if (avail.isEmpty()) {
+ checkState(!avail.isEmpty(),
+ "There's no Bandwidth resource on %s?",
+ link);
+ }
+ BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next();
+ double bwLeft = bw.bandwidth().toDouble();
+ BandwidthResourceAllocation bwReq = ((BandwidthResourceAllocation) req);
+ bwLeft -= bwReq.bandwidth().toDouble();
+ if (bwLeft < 0) {
+ throw new ResourceAllocationException(
+ PositionalParameterStringFormatter.format(
+ "Unable to allocate bandwidth for link {} "
+ + " requested amount is {} current allocation is {}",
+ link,
+ bwReq.bandwidth().toDouble(),
+ bw));
+ }
+ } else if (req instanceof LambdaResourceAllocation) {
+ LambdaResourceAllocation lambdaAllocation = (LambdaResourceAllocation) req;
+ // check if allocation should be accepted
+ if (!avail.contains(req)) {
+ // requested lambda was not available
+ throw new ResourceAllocationException(
+ PositionalParameterStringFormatter.format(
+ "Unable to allocate lambda for link {} lambda is {}",
+ link,
+ lambdaAllocation.lambda().toInt()));
+ }
+ } else if (req instanceof MplsLabelResourceAllocation) {
+ MplsLabelResourceAllocation mplsAllocation = (MplsLabelResourceAllocation) req;
+ if (!avail.contains(req)) {
+ throw new ResourceAllocationException(
+ PositionalParameterStringFormatter
+ .format("Unable to allocate MPLS label for link "
+ + "{} MPLS label is {}",
+ link,
+ mplsAllocation
+ .mplsLabel()
+ .toString()));
+ }
+ }
+ }
+ // all requests allocatable => add allocation
+ final LinkKey linkKey = LinkKey.linkKey(link);
+ TransactionalMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
+ List<LinkResourceAllocations> before = linkAllocs.get(linkKey);
+ if (before == null) {
+ List<LinkResourceAllocations> after = new ArrayList<>();
+ after.add(allocations);
+ before = linkAllocs.putIfAbsent(linkKey, after);
+ if (before != null) {
+ // concurrent allocation detected, retry transaction : is this needed?
+ log.warn("Concurrent Allocation, retrying");
+ throw new TransactionException();
+ }
+ } else {
+ List<LinkResourceAllocations> after = new ArrayList<>(before.size() + 1);
+ after.addAll(before);
+ after.add(allocations);
+ linkAllocs.replace(linkKey, before, after);
+ }
+ }
+
+ @Override
+ public LinkResourceEvent releaseResources(LinkResourceAllocations allocations) {
+ checkNotNull(allocations);
+
+ final IntentId intentId = allocations.intentId();
+ final Collection<Link> links = allocations.links();
+ boolean success = false;
+ do {
+ TransactionContext tx = getTxContext();
+ tx.begin();
+ try {
+ TransactionalMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx);
+ intentAllocs.remove(intentId);
+
+ TransactionalMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
+ links.forEach(link -> {
+ final LinkKey linkId = LinkKey.linkKey(link);
+
+ List<LinkResourceAllocations> before = linkAllocs.get(linkId);
+ if (before == null || before.isEmpty()) {
+ // something is wrong, but it is already freed
+ log.warn("There was no resource left to release on {}", linkId);
+ return;
+ }
+ List<LinkResourceAllocations> after = new ArrayList<>(before);
+ after.remove(allocations);
+ linkAllocs.replace(linkId, before, after);
+ });
+ tx.commit();
+ success = true;
+ } catch (TransactionException e) {
+ log.debug("Transaction failed, retrying", e);
+ tx.abort();
+ } catch (Exception e) {
+ log.error("Exception thrown during releaseResource {}", allocations, e);
+ tx.abort();
+ throw e;
+ }
+ } while (!success);
+
+ // Issue events to force recompilation of intents.
+ final List<LinkResourceAllocations> releasedResources = ImmutableList.of(allocations);
+ return new LinkResourceEvent(
+ LinkResourceEvent.Type.ADDITIONAL_RESOURCES_AVAILABLE,
+ releasedResources);
+
+ }
+
+ @Override
+ public LinkResourceAllocations getAllocations(IntentId intentId) {
+ checkNotNull(intentId);
+ Versioned<LinkResourceAllocations> alloc = null;
+ try {
+ alloc = intentAllocMap.get(intentId);
+ } catch (Exception e) {
+ log.warn("Could not read resource allocation information", e);
+ }
+ return alloc == null ? null : alloc.value();
+ }
+
+ @Override
+ public Iterable<LinkResourceAllocations> getAllocations(Link link) {
+ checkNotNull(link);
+ TransactionContext tx = getTxContext();
+ Iterable<LinkResourceAllocations> res = null;
+ tx.begin();
+ try {
+ res = getAllocations(tx, link);
+ } finally {
+ tx.abort();
+ }
+ return res == null ? Collections.emptyList() : res;
+ }
+
+ @Override
+ public Iterable<LinkResourceAllocations> getAllocations() {
+ try {
+ Set<LinkResourceAllocations> allocs =
+ intentAllocMap.values().stream().map(Versioned::value).collect(Collectors.toSet());
+ return ImmutableSet.copyOf(allocs);
+ } catch (Exception e) {
+ log.warn("Could not read resource allocation information", e);
+ }
+ return ImmutableSet.of();
+ }
+
+ private Iterable<LinkResourceAllocations> getAllocations(TransactionContext tx, Link link) {
+ checkNotNull(tx);
+ checkNotNull(link);
+ final LinkKey key = LinkKey.linkKey(link);
+ TransactionalMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
+ List<LinkResourceAllocations> res = null;
+
+ res = linkAllocs.get(key);
+ if (res == null) {
+ res = linkAllocs.putIfAbsent(key, new ArrayList<>());
+
+ if (res == null) {
+ return Collections.emptyList();
+ } else {
+ return res;
+ }
+ }
+ return res;
+ }
+
+}