aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java')
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java543
1 files changed, 543 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
new file mode 100644
index 00000000..a014504c
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
@@ -0,0 +1,543 @@
+package org.onosproject.incubator.store.resource.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
+import org.onosproject.incubator.net.resource.label.LabelResource;
+import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
+import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
+import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
+import org.onosproject.incubator.net.resource.label.LabelResourceId;
+import org.onosproject.incubator.net.resource.label.LabelResourcePool;
+import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
+import org.onosproject.incubator.net.resource.label.LabelResourceStore;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+/**
+ * Manages label resources using copycat.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedLabelResourceStore
+ extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
+ implements LabelResourceStore {
+ private final Logger log = getLogger(getClass());
+
+ private static final String POOL_MAP_NAME = "labelresourcepool";
+
+ private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
+
+ private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private ExecutorService messageHandlingExecutor;
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+ private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
+
+ private static final Serializer SERIALIZER = Serializer
+ .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
+ .register(LabelResourceEvent.class)
+ .register(LabelResourcePool.class).register(DeviceId.class)
+ .register(LabelResourceRequest.class)
+ .register(LabelResourceRequest.Type.class)
+ .register(LabelResourceEvent.Type.class)
+ .register(DefaultLabelResource.class)
+ .register(LabelResourceId.class)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
+
+ @Activate
+ public void activate() {
+
+ resourcePool = storageService
+ .<DeviceId, LabelResourcePool>consistentMapBuilder()
+ .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
+ .withPartitionsDisabled().build();
+ messageHandlingExecutor = Executors
+ .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/flow",
+ "message-handlers"));
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LabelResourcePool operation = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ operation);
+ boolean b = internalCreate(operation);
+ message.respond(SERIALIZER.encode(b));
+ }
+ }, messageHandlingExecutor);
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ DeviceId deviceId = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ deviceId);
+ boolean b = internalDestroy(deviceId);
+ message.respond(SERIALIZER.encode(b));
+ }
+ }, messageHandlingExecutor);
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LabelResourceRequest request = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ request);
+ final Collection<LabelResource> resource = internalApply(request);
+ message.respond(SERIALIZER
+ .encode(resource));
+ }
+ }, messageHandlingExecutor);
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LabelResourceRequest request = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ request);
+ final boolean isSuccess = internalRelease(request);
+ message.respond(SERIALIZER
+ .encode(isSuccess));
+ }
+ }, messageHandlingExecutor);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
+ messageHandlingExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public boolean createDevicePool(DeviceId deviceId,
+ LabelResourceId beginLabel,
+ LabelResourceId endLabel) {
+ LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
+ beginLabel.labelId(),
+ endLabel.labelId());
+ return this.create(pool);
+ }
+
+ @Override
+ public boolean createGlobalPool(LabelResourceId beginLabel,
+ LabelResourceId endLabel) {
+ LabelResourcePool pool = new LabelResourcePool(
+ GLOBAL_RESOURCE_POOL_DEVICE_ID,
+ beginLabel.labelId(),
+ endLabel.labelId());
+ return this.internalCreate(pool);
+ }
+
+ private boolean create(LabelResourcePool pool) {
+ Device device = (Device) deviceService.getDevice(pool.deviceId());
+ if (device == null) {
+ return false;
+ }
+
+ NodeId master = mastershipService.getMasterFor(pool.deviceId());
+
+ if (master == null) {
+ log.warn("Failed to create label resource pool: No master for {}", pool);
+ return false;
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalCreate(pool);
+ }
+
+ log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+ master, pool.deviceId());
+
+ return complete(clusterCommunicator
+ .sendAndReceive(pool,
+ LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+
+ private boolean internalCreate(LabelResourcePool pool) {
+ Versioned<LabelResourcePool> poolOld = resourcePool
+ .get(pool.deviceId());
+ if (poolOld == null) {
+ resourcePool.put(pool.deviceId(), pool);
+ LabelResourceEvent event = new LabelResourceEvent(
+ Type.POOL_CREATED,
+ pool);
+ notifyDelegate(event);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean destroyDevicePool(DeviceId deviceId) {
+ Device device = (Device) deviceService.getDevice(deviceId);
+ if (device == null) {
+ return false;
+ }
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
+ return false;
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalDestroy(deviceId);
+ }
+
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
+
+ return complete(clusterCommunicator
+ .sendAndReceive(deviceId,
+ LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+
+ private boolean internalDestroy(DeviceId deviceId) {
+ Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
+ if (poolOld != null) {
+ resourcePool.remove(deviceId);
+ LabelResourceEvent event = new LabelResourceEvent(
+ Type.POOL_CREATED,
+ poolOld.value());
+ notifyDelegate(event);
+ }
+ log.info("success to destroy the label resource pool of device id {}",
+ deviceId);
+ return true;
+ }
+
+ @Override
+ public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
+ long applyNum) {
+ Device device = (Device) deviceService.getDevice(deviceId);
+ if (device == null) {
+ return Collections.emptyList();
+ }
+ LabelResourceRequest request = new LabelResourceRequest(
+ deviceId,
+ LabelResourceRequest.Type.APPLY,
+ applyNum, null);
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
+ return Collections.emptyList();
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalApply(request);
+ }
+
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
+
+ return complete(clusterCommunicator
+ .sendAndReceive(request,
+ LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+
+ private Collection<LabelResource> internalApply(LabelResourceRequest request) {
+ DeviceId deviceId = request.deviceId();
+ long applyNum = request.applyNum();
+ Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
+ LabelResourcePool pool = poolOld.value();
+ Collection<LabelResource> result = new HashSet<LabelResource>();
+ long freeNum = this.getFreeNumOfDevicePool(deviceId);
+ if (applyNum > freeNum) {
+ log.info("the free number of the label resource pool of deviceId {} is not enough.");
+ return Collections.emptyList();
+ }
+ Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
+ pool.releaseLabelId());
+ long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
+ .size();
+ LabelResource resource = null;
+ for (int i = 0; i < tmp; i++) {
+ Iterator<LabelResource> it = releaseLabels.iterator();
+ if (it.hasNext()) {
+ resource = it.next();
+ releaseLabels.remove(resource);
+ }
+ result.add(resource);
+ }
+ for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
+ .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
+ resource = new DefaultLabelResource(deviceId,
+ LabelResourceId
+ .labelResourceId(j));
+ result.add(resource);
+ }
+ long beginLabel = pool.beginLabel().labelId();
+ long endLabel = pool.endLabel().labelId();
+ long totalNum = pool.totalNum();
+ long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
+ long usedNum = pool.usedNum() + applyNum;
+ ImmutableSet<LabelResource> freeLabel = ImmutableSet
+ .copyOf(releaseLabels);
+ LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+ beginLabel, endLabel,
+ totalNum, usedNum,
+ current, freeLabel);
+ resourcePool.put(deviceId, newPool);
+ log.info("success to apply label resource");
+ return result;
+ }
+
+ @Override
+ public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
+ Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
+ Set<DeviceId> deviceIdSet = maps.keySet();
+ LabelResourceRequest request = null;
+ for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
+ DeviceId deviceId = (DeviceId) it.next();
+ Device device = (Device) deviceService.getDevice(deviceId);
+ if (device == null) {
+ continue;
+ }
+ ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
+ .get(deviceId));
+ request = new LabelResourceRequest(
+ deviceId,
+ LabelResourceRequest.Type.RELEASE,
+ 0, collection);
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
+ return false;
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalRelease(request);
+ }
+
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
+
+ return complete(clusterCommunicator
+ .sendAndReceive(request,
+ LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+ return false;
+ }
+
+ private boolean internalRelease(LabelResourceRequest request) {
+ DeviceId deviceId = request.deviceId();
+ Collection<LabelResource> release = request.releaseCollection();
+ Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
+ LabelResourcePool pool = poolOld.value();
+ if (pool == null) {
+ log.info("the label resource pool of device id {} does not exist");
+ return false;
+ }
+ Set<LabelResource> storeSet = new HashSet<LabelResource>(
+ pool.releaseLabelId());
+ LabelResource labelResource = null;
+ long realReleasedNum = 0;
+ for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
+ labelResource = it.next();
+ if (labelResource.labelResourceId().labelId() < pool.beginLabel()
+ .labelId()
+ || labelResource.labelResourceId().labelId() > pool
+ .endLabel().labelId()) {
+ continue;
+ }
+ if (pool.currentUsedMaxLabelId().labelId() > labelResource
+ .labelResourceId().labelId()
+ || !storeSet.contains(labelResource)) {
+ storeSet.add(labelResource);
+ realReleasedNum++;
+ }
+ }
+ long beginNum = pool.beginLabel().labelId();
+ long endNum = pool.endLabel().labelId();
+ long totalNum = pool.totalNum();
+ long usedNum = pool.usedNum() - realReleasedNum;
+ long current = pool.currentUsedMaxLabelId().labelId();
+ ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
+ LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+ beginNum, endNum,
+ totalNum, usedNum,
+ current, s);
+ resourcePool.put(deviceId, newPool);
+ log.info("success to release label resource");
+ return true;
+ }
+
+ @Override
+ public boolean isDevicePoolFull(DeviceId deviceId) {
+ Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
+ if (pool == null) {
+ return true;
+ }
+ return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
+ && pool.value().releaseLabelId().size() == 0 ? true : false;
+ }
+
+ @Override
+ public long getFreeNumOfDevicePool(DeviceId deviceId) {
+ Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
+ if (pool == null) {
+ return 0;
+ }
+ return pool.value().endLabel().labelId()
+ - pool.value().currentUsedMaxLabelId().labelId()
+ + pool.value().releaseLabelId().size();
+ }
+
+ @Override
+ public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
+ Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
+ return pool == null ? null : pool.value();
+ }
+
+ @Override
+ public boolean destroyGlobalPool() {
+ return this.internalDestroy(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ @Override
+ public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
+ LabelResourceRequest request = new LabelResourceRequest(
+ DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+ LabelResourceRequest.Type.APPLY,
+ applyNum, null);
+ return this.internalApply(request);
+ }
+
+ @Override
+ public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
+ Set<LabelResource> set = new HashSet<LabelResource>();
+ DefaultLabelResource resource = null;
+ for (LabelResourceId labelResource : release) {
+ resource = new DefaultLabelResource(
+ DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+ labelResource);
+ set.add(resource);
+ }
+ LabelResourceRequest request = new LabelResourceRequest(
+ DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+ LabelResourceRequest.Type.APPLY,
+ 0,
+ ImmutableSet
+ .copyOf(set));
+ return this.internalRelease(request);
+ }
+
+ @Override
+ public boolean isGlobalPoolFull() {
+ return this.isDevicePoolFull(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ @Override
+ public long getFreeNumOfGlobalPool() {
+ return this.getFreeNumOfDevicePool(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ @Override
+ public LabelResourcePool getGlobalLabelResourcePool() {
+ return this.getDeviceLabelResourcePool(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ private <T> T complete(Future<T> future) {
+ try {
+ return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted while waiting for operation to complete.", e);
+ return null;
+ } catch (TimeoutException | ExecutionException e) {
+ log.error("Failed remote operation", e);
+ return null;
+ }
+ }
+}