diff options
Diffstat (limited to 'framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource')
3 files changed, 0 insertions, 599 deletions
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java deleted file mode 100644 index a7183de8..00000000 --- a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.incubator.store.resource.impl; - -import static org.onlab.util.Tools.groupedThreads; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.util.KryoNamespace; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.incubator.net.resource.label.DefaultLabelResource; -import org.onosproject.incubator.net.resource.label.LabelResource; -import org.onosproject.incubator.net.resource.label.LabelResourceDelegate; -import org.onosproject.incubator.net.resource.label.LabelResourceEvent; -import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type; -import org.onosproject.incubator.net.resource.label.LabelResourceId; -import org.onosproject.incubator.net.resource.label.LabelResourcePool; -import org.onosproject.incubator.net.resource.label.LabelResourceRequest; -import org.onosproject.incubator.net.resource.label.LabelResourceStore; -import org.onosproject.mastership.MastershipService; -import org.onosproject.net.Device; -import org.onosproject.net.DeviceId; -import org.onosproject.net.device.DeviceService; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.ClusterMessage; -import org.onosproject.store.cluster.messaging.ClusterMessageHandler; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Versioned; -import org.slf4j.Logger; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; - -/** - * Manages label resources using copycat. - */ -@Component(immediate = true, enabled = true) -@Service -public class DistributedLabelResourceStore - extends AbstractStore<LabelResourceEvent, LabelResourceDelegate> - implements LabelResourceStore { - private final Logger log = getLogger(getClass()); - - private static final String POOL_MAP_NAME = "labelresourcepool"; - - private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id"; - - private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected MastershipService mastershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected DeviceService deviceService; - - private ExecutorService messageHandlingExecutor; - private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8; - private static final long PEER_REQUEST_TIMEOUT_MS = 5000; - - private static final Serializer SERIALIZER = Serializer - .using(new KryoNamespace.Builder().register(KryoNamespaces.API) - .register(LabelResourceEvent.class) - .register(LabelResourcePool.class).register(DeviceId.class) - .register(LabelResourceRequest.class) - .register(LabelResourceRequest.Type.class) - .register(LabelResourceEvent.Type.class) - .register(DefaultLabelResource.class) - .register(LabelResourceId.class) - .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build()); - - @Activate - public void activate() { - - resourcePool = storageService - .<DeviceId, LabelResourcePool>consistentMapBuilder() - .withName(POOL_MAP_NAME).withSerializer(SERIALIZER) - .withPartitionsDisabled().build(); - messageHandlingExecutor = Executors - .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, - groupedThreads("onos/store/flow", - "message-handlers")); - clusterCommunicator - .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED, - new ClusterMessageHandler() { - - @Override - public void handle(ClusterMessage message) { - LabelResourcePool operation = SERIALIZER - .decode(message.payload()); - log.trace("received get flow entry request for {}", - operation); - boolean b = internalCreate(operation); - message.respond(SERIALIZER.encode(b)); - } - }, messageHandlingExecutor); - clusterCommunicator - .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, - new ClusterMessageHandler() { - - @Override - public void handle(ClusterMessage message) { - DeviceId deviceId = SERIALIZER - .decode(message.payload()); - log.trace("received get flow entry request for {}", - deviceId); - boolean b = internalDestroy(deviceId); - message.respond(SERIALIZER.encode(b)); - } - }, messageHandlingExecutor); - clusterCommunicator - .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY, - new ClusterMessageHandler() { - - @Override - public void handle(ClusterMessage message) { - LabelResourceRequest request = SERIALIZER - .decode(message.payload()); - log.trace("received get flow entry request for {}", - request); - final Collection<LabelResource> resource = internalApply(request); - message.respond(SERIALIZER - .encode(resource)); - } - }, messageHandlingExecutor); - clusterCommunicator - .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE, - new ClusterMessageHandler() { - - @Override - public void handle(ClusterMessage message) { - LabelResourceRequest request = SERIALIZER - .decode(message.payload()); - log.trace("received get flow entry request for {}", - request); - final boolean isSuccess = internalRelease(request); - message.respond(SERIALIZER - .encode(isSuccess)); - } - }, messageHandlingExecutor); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - clusterCommunicator - .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED); - clusterCommunicator - .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY); - clusterCommunicator - .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED); - clusterCommunicator - .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE); - messageHandlingExecutor.shutdown(); - log.info("Stopped"); - } - - @Override - public boolean createDevicePool(DeviceId deviceId, - LabelResourceId beginLabel, - LabelResourceId endLabel) { - LabelResourcePool pool = new LabelResourcePool(deviceId.toString(), - beginLabel.labelId(), - endLabel.labelId()); - return this.create(pool); - } - - @Override - public boolean createGlobalPool(LabelResourceId beginLabel, - LabelResourceId endLabel) { - LabelResourcePool pool = new LabelResourcePool(GLOBAL_RESOURCE_POOL_DEVICE_ID, - beginLabel.labelId(), - endLabel.labelId()); - return this.internalCreate(pool); - } - - private boolean create(LabelResourcePool pool) { - Device device = (Device) deviceService.getDevice(pool.deviceId()); - if (device == null) { - return false; - } - - NodeId master = mastershipService.getMasterFor(pool.deviceId()); - - if (master == null) { - log.warn("Failed to create label resource pool: No master for {}", pool); - return false; - } - - if (master.equals(clusterService.getLocalNode().id())) { - return internalCreate(pool); - } - - log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", - master, pool.deviceId()); - - return complete(clusterCommunicator - .sendAndReceive(pool, - LabelResourceMessageSubjects.LABEL_POOL_CREATED, - SERIALIZER::encode, SERIALIZER::decode, - master)); - } - - private boolean internalCreate(LabelResourcePool pool) { - Versioned<LabelResourcePool> poolOld = resourcePool - .get(pool.deviceId()); - if (poolOld == null) { - resourcePool.put(pool.deviceId(), pool); - LabelResourceEvent event = new LabelResourceEvent(Type.POOL_CREATED, - pool); - notifyDelegate(event); - return true; - } - return false; - } - - @Override - public boolean destroyDevicePool(DeviceId deviceId) { - Device device = (Device) deviceService.getDevice(deviceId); - if (device == null) { - return false; - } - - NodeId master = mastershipService.getMasterFor(deviceId); - - if (master == null) { - log.warn("Failed to destroyDevicePool. No master for {}", deviceId); - return false; - } - - if (master.equals(clusterService.getLocalNode().id())) { - return internalDestroy(deviceId); - } - - log.trace("Forwarding request to {}, which is the primary (master) for device {}", - master, deviceId); - - return complete(clusterCommunicator - .sendAndReceive(deviceId, - LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, - SERIALIZER::encode, SERIALIZER::decode, - master)); - } - - private boolean internalDestroy(DeviceId deviceId) { - Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); - if (poolOld != null) { - resourcePool.remove(deviceId); - LabelResourceEvent event = new LabelResourceEvent(Type.POOL_DESTROYED, - poolOld.value()); - notifyDelegate(event); - } - log.info("success to destroy the label resource pool of device id {}", - deviceId); - return true; - } - - @Override - public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId, - long applyNum) { - Device device = (Device) deviceService.getDevice(deviceId); - if (device == null) { - return Collections.emptyList(); - } - LabelResourceRequest request = new LabelResourceRequest(deviceId, - LabelResourceRequest.Type.APPLY, - applyNum, null); - NodeId master = mastershipService.getMasterFor(deviceId); - - if (master == null) { - log.warn("Failed to applyFromDevicePool: No master for {}", deviceId); - return Collections.emptyList(); - } - - if (master.equals(clusterService.getLocalNode().id())) { - return internalApply(request); - } - - log.trace("Forwarding request to {}, which is the primary (master) for device {}", - master, deviceId); - - return complete(clusterCommunicator - .sendAndReceive(request, - LabelResourceMessageSubjects.LABEL_POOL_APPLY, - SERIALIZER::encode, SERIALIZER::decode, - master)); - } - - private Collection<LabelResource> internalApply(LabelResourceRequest request) { - DeviceId deviceId = request.deviceId(); - long applyNum = request.applyNum(); - Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); - LabelResourcePool pool = poolOld.value(); - Collection<LabelResource> result = new HashSet<LabelResource>(); - long freeNum = this.getFreeNumOfDevicePool(deviceId); - if (applyNum > freeNum) { - log.info("the free number of the label resource pool of deviceId {} is not enough."); - return Collections.emptyList(); - } - Set<LabelResource> releaseLabels = new HashSet<LabelResource>(pool.releaseLabelId()); - long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels - .size(); - LabelResource resource = null; - for (int i = 0; i < tmp; i++) { - Iterator<LabelResource> it = releaseLabels.iterator(); - if (it.hasNext()) { - resource = it.next(); - releaseLabels.remove(resource); - } - result.add(resource); - } - for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool - .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) { - resource = new DefaultLabelResource(deviceId, - LabelResourceId - .labelResourceId(j)); - result.add(resource); - } - long beginLabel = pool.beginLabel().labelId(); - long endLabel = pool.endLabel().labelId(); - long totalNum = pool.totalNum(); - long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp; - long usedNum = pool.usedNum() + applyNum; - ImmutableSet<LabelResource> freeLabel = ImmutableSet - .copyOf(releaseLabels); - LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(), - beginLabel, endLabel, - totalNum, usedNum, - current, freeLabel); - resourcePool.put(deviceId, newPool); - log.info("success to apply label resource"); - return result; - } - - @Override - public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) { - Map<DeviceId, Collection<LabelResource>> maps = release.asMap(); - Set<DeviceId> deviceIdSet = maps.keySet(); - LabelResourceRequest request = null; - for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) { - DeviceId deviceId = (DeviceId) it.next(); - Device device = (Device) deviceService.getDevice(deviceId); - if (device == null) { - continue; - } - ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps - .get(deviceId)); - request = new LabelResourceRequest(deviceId, - LabelResourceRequest.Type.RELEASE, - 0, collection); - NodeId master = mastershipService.getMasterFor(deviceId); - - if (master == null) { - log.warn("Failed to releaseToDevicePool: No master for {}", deviceId); - return false; - } - - if (master.equals(clusterService.getLocalNode().id())) { - return internalRelease(request); - } - - log.trace("Forwarding request to {}, which is the primary (master) for device {}", - master, deviceId); - - return complete(clusterCommunicator - .sendAndReceive(request, - LabelResourceMessageSubjects.LABEL_POOL_RELEASE, - SERIALIZER::encode, SERIALIZER::decode, - master)); - } - return false; - } - - private boolean internalRelease(LabelResourceRequest request) { - DeviceId deviceId = request.deviceId(); - Collection<LabelResource> release = request.releaseCollection(); - Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); - LabelResourcePool pool = poolOld.value(); - if (pool == null) { - log.info("the label resource pool of device id {} does not exist"); - return false; - } - Set<LabelResource> storeSet = new HashSet<LabelResource>(pool.releaseLabelId()); - LabelResource labelResource = null; - long realReleasedNum = 0; - for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) { - labelResource = it.next(); - if (labelResource.labelResourceId().labelId() < pool.beginLabel() - .labelId() - || labelResource.labelResourceId().labelId() > pool - .endLabel().labelId()) { - continue; - } - if (pool.currentUsedMaxLabelId().labelId() > labelResource - .labelResourceId().labelId() - || !storeSet.contains(labelResource)) { - storeSet.add(labelResource); - realReleasedNum++; - } - } - long beginNum = pool.beginLabel().labelId(); - long endNum = pool.endLabel().labelId(); - long totalNum = pool.totalNum(); - long usedNum = pool.usedNum() - realReleasedNum; - long current = pool.currentUsedMaxLabelId().labelId(); - ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet); - LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(), - beginNum, endNum, - totalNum, usedNum, - current, s); - resourcePool.put(deviceId, newPool); - log.info("success to release label resource"); - return true; - } - - @Override - public boolean isDevicePoolFull(DeviceId deviceId) { - Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); - if (pool == null) { - return true; - } - return pool.value().currentUsedMaxLabelId() == pool.value().endLabel() - && pool.value().releaseLabelId().size() == 0 ? true : false; - } - - @Override - public long getFreeNumOfDevicePool(DeviceId deviceId) { - Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); - if (pool == null) { - return 0; - } - return pool.value().endLabel().labelId() - - pool.value().currentUsedMaxLabelId().labelId() - + pool.value().releaseLabelId().size(); - } - - @Override - public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) { - Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); - return pool == null ? null : pool.value(); - } - - @Override - public boolean destroyGlobalPool() { - return this.internalDestroy(DeviceId - .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); - } - - @Override - public Collection<LabelResource> applyFromGlobalPool(long applyNum) { - LabelResourceRequest request = new LabelResourceRequest(DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), - LabelResourceRequest.Type.APPLY, - applyNum, null); - return this.internalApply(request); - } - - @Override - public boolean releaseToGlobalPool(Set<LabelResourceId> release) { - Set<LabelResource> set = new HashSet<LabelResource>(); - DefaultLabelResource resource = null; - for (LabelResourceId labelResource : release) { - resource = new DefaultLabelResource(DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), - labelResource); - set.add(resource); - } - LabelResourceRequest request = new LabelResourceRequest(DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), - LabelResourceRequest.Type.APPLY, - 0, - ImmutableSet.copyOf(set)); - return this.internalRelease(request); - } - - @Override - public boolean isGlobalPoolFull() { - return this.isDevicePoolFull(DeviceId - .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); - } - - @Override - public long getFreeNumOfGlobalPool() { - return this.getFreeNumOfDevicePool(DeviceId - .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); - } - - @Override - public LabelResourcePool getGlobalLabelResourcePool() { - return this.getDeviceLabelResourcePool(DeviceId - .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); - } - - private <T> T complete(Future<T> future) { - try { - return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Interrupted while waiting for operation to complete.", e); - return null; - } catch (TimeoutException | ExecutionException e) { - log.error("Failed remote operation", e); - return null; - } - } -} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java deleted file mode 100644 index 1b2f6515..00000000 --- a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.incubator.store.resource.impl; - -import org.onosproject.store.cluster.messaging.MessageSubject; - -public final class LabelResourceMessageSubjects { - - private LabelResourceMessageSubjects() { - } - public static final MessageSubject LABEL_POOL_CREATED - = new MessageSubject("label-resource-pool-created"); - public static final MessageSubject LABEL_POOL_DESTROYED - = new MessageSubject("label-resource-pool-destroyed"); - public static final MessageSubject LABEL_POOL_APPLY - = new MessageSubject("label-resource-pool-apply"); - public static final MessageSubject LABEL_POOL_RELEASE - = new MessageSubject("label-resource-pool-release"); -} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java deleted file mode 100644 index da0dc2e5..00000000 --- a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of the label resource distributed store. - */ -package org.onosproject.incubator.store.resource.impl;
\ No newline at end of file |