diff options
Diffstat (limited to 'framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java')
-rw-r--r-- | framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java | 543 |
1 files changed, 543 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java new file mode 100644 index 00000000..a014504c --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java @@ -0,0 +1,543 @@ +package org.onosproject.incubator.store.resource.impl; + +import static org.onlab.util.Tools.groupedThreads; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.incubator.net.resource.label.DefaultLabelResource; +import org.onosproject.incubator.net.resource.label.LabelResource; +import org.onosproject.incubator.net.resource.label.LabelResourceDelegate; +import org.onosproject.incubator.net.resource.label.LabelResourceEvent; +import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type; +import org.onosproject.incubator.net.resource.label.LabelResourceId; +import org.onosproject.incubator.net.resource.label.LabelResourcePool; +import org.onosproject.incubator.net.resource.label.LabelResourceRequest; +import org.onosproject.incubator.net.resource.label.LabelResourceStore; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.device.DeviceService; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterMessage; +import org.onosproject.store.cluster.messaging.ClusterMessageHandler; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; + +/** + * Manages label resources using copycat. + */ +@Component(immediate = true, enabled = true) +@Service +public class DistributedLabelResourceStore + extends AbstractStore<LabelResourceEvent, LabelResourceDelegate> + implements LabelResourceStore { + private final Logger log = getLogger(getClass()); + + private static final String POOL_MAP_NAME = "labelresourcepool"; + + private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id"; + + private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + + private ExecutorService messageHandlingExecutor; + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8; + private static final long PEER_REQUEST_TIMEOUT_MS = 5000; + + private static final Serializer SERIALIZER = Serializer + .using(new KryoNamespace.Builder().register(KryoNamespaces.API) + .register(LabelResourceEvent.class) + .register(LabelResourcePool.class).register(DeviceId.class) + .register(LabelResourceRequest.class) + .register(LabelResourceRequest.Type.class) + .register(LabelResourceEvent.Type.class) + .register(DefaultLabelResource.class) + .register(LabelResourceId.class) + .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build()); + + @Activate + public void activate() { + + resourcePool = storageService + .<DeviceId, LabelResourcePool>consistentMapBuilder() + .withName(POOL_MAP_NAME).withSerializer(SERIALIZER) + .withPartitionsDisabled().build(); + messageHandlingExecutor = Executors + .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, + groupedThreads("onos/store/flow", + "message-handlers")); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + LabelResourcePool operation = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + operation); + boolean b = internalCreate(operation); + message.respond(SERIALIZER.encode(b)); + } + }, messageHandlingExecutor); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + DeviceId deviceId = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + deviceId); + boolean b = internalDestroy(deviceId); + message.respond(SERIALIZER.encode(b)); + } + }, messageHandlingExecutor); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + LabelResourceRequest request = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + request); + final Collection<LabelResource> resource = internalApply(request); + message.respond(SERIALIZER + .encode(resource)); + } + }, messageHandlingExecutor); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + LabelResourceRequest request = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + request); + final boolean isSuccess = internalRelease(request); + message.respond(SERIALIZER + .encode(isSuccess)); + } + }, messageHandlingExecutor); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED); + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY); + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED); + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE); + messageHandlingExecutor.shutdown(); + log.info("Stopped"); + } + + @Override + public boolean createDevicePool(DeviceId deviceId, + LabelResourceId beginLabel, + LabelResourceId endLabel) { + LabelResourcePool pool = new LabelResourcePool(deviceId.toString(), + beginLabel.labelId(), + endLabel.labelId()); + return this.create(pool); + } + + @Override + public boolean createGlobalPool(LabelResourceId beginLabel, + LabelResourceId endLabel) { + LabelResourcePool pool = new LabelResourcePool( + GLOBAL_RESOURCE_POOL_DEVICE_ID, + beginLabel.labelId(), + endLabel.labelId()); + return this.internalCreate(pool); + } + + private boolean create(LabelResourcePool pool) { + Device device = (Device) deviceService.getDevice(pool.deviceId()); + if (device == null) { + return false; + } + + NodeId master = mastershipService.getMasterFor(pool.deviceId()); + + if (master == null) { + log.warn("Failed to create label resource pool: No master for {}", pool); + return false; + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalCreate(pool); + } + + log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", + master, pool.deviceId()); + + return complete(clusterCommunicator + .sendAndReceive(pool, + LabelResourceMessageSubjects.LABEL_POOL_CREATED, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + + private boolean internalCreate(LabelResourcePool pool) { + Versioned<LabelResourcePool> poolOld = resourcePool + .get(pool.deviceId()); + if (poolOld == null) { + resourcePool.put(pool.deviceId(), pool); + LabelResourceEvent event = new LabelResourceEvent( + Type.POOL_CREATED, + pool); + notifyDelegate(event); + return true; + } + return false; + } + + @Override + public boolean destroyDevicePool(DeviceId deviceId) { + Device device = (Device) deviceService.getDevice(deviceId); + if (device == null) { + return false; + } + + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.warn("Failed to destroyDevicePool. No master for {}", deviceId); + return false; + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalDestroy(deviceId); + } + + log.trace("Forwarding request to {}, which is the primary (master) for device {}", + master, deviceId); + + return complete(clusterCommunicator + .sendAndReceive(deviceId, + LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + + private boolean internalDestroy(DeviceId deviceId) { + Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); + if (poolOld != null) { + resourcePool.remove(deviceId); + LabelResourceEvent event = new LabelResourceEvent( + Type.POOL_CREATED, + poolOld.value()); + notifyDelegate(event); + } + log.info("success to destroy the label resource pool of device id {}", + deviceId); + return true; + } + + @Override + public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId, + long applyNum) { + Device device = (Device) deviceService.getDevice(deviceId); + if (device == null) { + return Collections.emptyList(); + } + LabelResourceRequest request = new LabelResourceRequest( + deviceId, + LabelResourceRequest.Type.APPLY, + applyNum, null); + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.warn("Failed to applyFromDevicePool: No master for {}", deviceId); + return Collections.emptyList(); + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalApply(request); + } + + log.trace("Forwarding request to {}, which is the primary (master) for device {}", + master, deviceId); + + return complete(clusterCommunicator + .sendAndReceive(request, + LabelResourceMessageSubjects.LABEL_POOL_APPLY, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + + private Collection<LabelResource> internalApply(LabelResourceRequest request) { + DeviceId deviceId = request.deviceId(); + long applyNum = request.applyNum(); + Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); + LabelResourcePool pool = poolOld.value(); + Collection<LabelResource> result = new HashSet<LabelResource>(); + long freeNum = this.getFreeNumOfDevicePool(deviceId); + if (applyNum > freeNum) { + log.info("the free number of the label resource pool of deviceId {} is not enough."); + return Collections.emptyList(); + } + Set<LabelResource> releaseLabels = new HashSet<LabelResource>( + pool.releaseLabelId()); + long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels + .size(); + LabelResource resource = null; + for (int i = 0; i < tmp; i++) { + Iterator<LabelResource> it = releaseLabels.iterator(); + if (it.hasNext()) { + resource = it.next(); + releaseLabels.remove(resource); + } + result.add(resource); + } + for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool + .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) { + resource = new DefaultLabelResource(deviceId, + LabelResourceId + .labelResourceId(j)); + result.add(resource); + } + long beginLabel = pool.beginLabel().labelId(); + long endLabel = pool.endLabel().labelId(); + long totalNum = pool.totalNum(); + long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp; + long usedNum = pool.usedNum() + applyNum; + ImmutableSet<LabelResource> freeLabel = ImmutableSet + .copyOf(releaseLabels); + LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(), + beginLabel, endLabel, + totalNum, usedNum, + current, freeLabel); + resourcePool.put(deviceId, newPool); + log.info("success to apply label resource"); + return result; + } + + @Override + public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) { + Map<DeviceId, Collection<LabelResource>> maps = release.asMap(); + Set<DeviceId> deviceIdSet = maps.keySet(); + LabelResourceRequest request = null; + for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) { + DeviceId deviceId = (DeviceId) it.next(); + Device device = (Device) deviceService.getDevice(deviceId); + if (device == null) { + continue; + } + ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps + .get(deviceId)); + request = new LabelResourceRequest( + deviceId, + LabelResourceRequest.Type.RELEASE, + 0, collection); + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.warn("Failed to releaseToDevicePool: No master for {}", deviceId); + return false; + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalRelease(request); + } + + log.trace("Forwarding request to {}, which is the primary (master) for device {}", + master, deviceId); + + return complete(clusterCommunicator + .sendAndReceive(request, + LabelResourceMessageSubjects.LABEL_POOL_RELEASE, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + return false; + } + + private boolean internalRelease(LabelResourceRequest request) { + DeviceId deviceId = request.deviceId(); + Collection<LabelResource> release = request.releaseCollection(); + Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); + LabelResourcePool pool = poolOld.value(); + if (pool == null) { + log.info("the label resource pool of device id {} does not exist"); + return false; + } + Set<LabelResource> storeSet = new HashSet<LabelResource>( + pool.releaseLabelId()); + LabelResource labelResource = null; + long realReleasedNum = 0; + for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) { + labelResource = it.next(); + if (labelResource.labelResourceId().labelId() < pool.beginLabel() + .labelId() + || labelResource.labelResourceId().labelId() > pool + .endLabel().labelId()) { + continue; + } + if (pool.currentUsedMaxLabelId().labelId() > labelResource + .labelResourceId().labelId() + || !storeSet.contains(labelResource)) { + storeSet.add(labelResource); + realReleasedNum++; + } + } + long beginNum = pool.beginLabel().labelId(); + long endNum = pool.endLabel().labelId(); + long totalNum = pool.totalNum(); + long usedNum = pool.usedNum() - realReleasedNum; + long current = pool.currentUsedMaxLabelId().labelId(); + ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet); + LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(), + beginNum, endNum, + totalNum, usedNum, + current, s); + resourcePool.put(deviceId, newPool); + log.info("success to release label resource"); + return true; + } + + @Override + public boolean isDevicePoolFull(DeviceId deviceId) { + Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); + if (pool == null) { + return true; + } + return pool.value().currentUsedMaxLabelId() == pool.value().endLabel() + && pool.value().releaseLabelId().size() == 0 ? true : false; + } + + @Override + public long getFreeNumOfDevicePool(DeviceId deviceId) { + Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); + if (pool == null) { + return 0; + } + return pool.value().endLabel().labelId() + - pool.value().currentUsedMaxLabelId().labelId() + + pool.value().releaseLabelId().size(); + } + + @Override + public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) { + Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); + return pool == null ? null : pool.value(); + } + + @Override + public boolean destroyGlobalPool() { + return this.internalDestroy(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + @Override + public Collection<LabelResource> applyFromGlobalPool(long applyNum) { + LabelResourceRequest request = new LabelResourceRequest( + DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), + LabelResourceRequest.Type.APPLY, + applyNum, null); + return this.internalApply(request); + } + + @Override + public boolean releaseToGlobalPool(Set<LabelResourceId> release) { + Set<LabelResource> set = new HashSet<LabelResource>(); + DefaultLabelResource resource = null; + for (LabelResourceId labelResource : release) { + resource = new DefaultLabelResource( + DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), + labelResource); + set.add(resource); + } + LabelResourceRequest request = new LabelResourceRequest( + DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), + LabelResourceRequest.Type.APPLY, + 0, + ImmutableSet + .copyOf(set)); + return this.internalRelease(request); + } + + @Override + public boolean isGlobalPoolFull() { + return this.isDevicePoolFull(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + @Override + public long getFreeNumOfGlobalPool() { + return this.getFreeNumOfDevicePool(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + @Override + public LabelResourcePool getGlobalLabelResourcePool() { + return this.getDeviceLabelResourcePool(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + private <T> T complete(Future<T> future) { + try { + return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Interrupted while waiting for operation to complete.", e); + return null; + } catch (TimeoutException | ExecutionException e) { + log.error("Failed remote operation", e); + return null; + } + } +} |