aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java')
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java547
1 files changed, 0 insertions, 547 deletions
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
deleted file mode 100644
index a7183de8..00000000
--- a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.incubator.store.resource.impl;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
-import org.onosproject.incubator.net.resource.label.LabelResource;
-import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
-import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
-import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
-import org.onosproject.incubator.net.resource.label.LabelResourceId;
-import org.onosproject.incubator.net.resource.label.LabelResourcePool;
-import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
-import org.onosproject.incubator.net.resource.label.LabelResourceStore;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-
-/**
- * Manages label resources using copycat.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class DistributedLabelResourceStore
- extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
- implements LabelResourceStore {
- private final Logger log = getLogger(getClass());
-
- private static final String POOL_MAP_NAME = "labelresourcepool";
-
- private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
-
- private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
-
- private ExecutorService messageHandlingExecutor;
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
- private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
-
- private static final Serializer SERIALIZER = Serializer
- .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
- .register(LabelResourceEvent.class)
- .register(LabelResourcePool.class).register(DeviceId.class)
- .register(LabelResourceRequest.class)
- .register(LabelResourceRequest.Type.class)
- .register(LabelResourceEvent.Type.class)
- .register(DefaultLabelResource.class)
- .register(LabelResourceId.class)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
-
- @Activate
- public void activate() {
-
- resourcePool = storageService
- .<DeviceId, LabelResourcePool>consistentMapBuilder()
- .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
- messageHandlingExecutor = Executors
- .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
- groupedThreads("onos/store/flow",
- "message-handlers"));
- clusterCommunicator
- .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- LabelResourcePool operation = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- operation);
- boolean b = internalCreate(operation);
- message.respond(SERIALIZER.encode(b));
- }
- }, messageHandlingExecutor);
- clusterCommunicator
- .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- deviceId);
- boolean b = internalDestroy(deviceId);
- message.respond(SERIALIZER.encode(b));
- }
- }, messageHandlingExecutor);
- clusterCommunicator
- .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- LabelResourceRequest request = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- request);
- final Collection<LabelResource> resource = internalApply(request);
- message.respond(SERIALIZER
- .encode(resource));
- }
- }, messageHandlingExecutor);
- clusterCommunicator
- .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- LabelResourceRequest request = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- request);
- final boolean isSuccess = internalRelease(request);
- message.respond(SERIALIZER
- .encode(isSuccess));
- }
- }, messageHandlingExecutor);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- clusterCommunicator
- .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
- clusterCommunicator
- .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
- clusterCommunicator
- .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
- clusterCommunicator
- .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
- messageHandlingExecutor.shutdown();
- log.info("Stopped");
- }
-
- @Override
- public boolean createDevicePool(DeviceId deviceId,
- LabelResourceId beginLabel,
- LabelResourceId endLabel) {
- LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
- beginLabel.labelId(),
- endLabel.labelId());
- return this.create(pool);
- }
-
- @Override
- public boolean createGlobalPool(LabelResourceId beginLabel,
- LabelResourceId endLabel) {
- LabelResourcePool pool = new LabelResourcePool(GLOBAL_RESOURCE_POOL_DEVICE_ID,
- beginLabel.labelId(),
- endLabel.labelId());
- return this.internalCreate(pool);
- }
-
- private boolean create(LabelResourcePool pool) {
- Device device = (Device) deviceService.getDevice(pool.deviceId());
- if (device == null) {
- return false;
- }
-
- NodeId master = mastershipService.getMasterFor(pool.deviceId());
-
- if (master == null) {
- log.warn("Failed to create label resource pool: No master for {}", pool);
- return false;
- }
-
- if (master.equals(clusterService.getLocalNode().id())) {
- return internalCreate(pool);
- }
-
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- master, pool.deviceId());
-
- return complete(clusterCommunicator
- .sendAndReceive(pool,
- LabelResourceMessageSubjects.LABEL_POOL_CREATED,
- SERIALIZER::encode, SERIALIZER::decode,
- master));
- }
-
- private boolean internalCreate(LabelResourcePool pool) {
- Versioned<LabelResourcePool> poolOld = resourcePool
- .get(pool.deviceId());
- if (poolOld == null) {
- resourcePool.put(pool.deviceId(), pool);
- LabelResourceEvent event = new LabelResourceEvent(Type.POOL_CREATED,
- pool);
- notifyDelegate(event);
- return true;
- }
- return false;
- }
-
- @Override
- public boolean destroyDevicePool(DeviceId deviceId) {
- Device device = (Device) deviceService.getDevice(deviceId);
- if (device == null) {
- return false;
- }
-
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
- return false;
- }
-
- if (master.equals(clusterService.getLocalNode().id())) {
- return internalDestroy(deviceId);
- }
-
- log.trace("Forwarding request to {}, which is the primary (master) for device {}",
- master, deviceId);
-
- return complete(clusterCommunicator
- .sendAndReceive(deviceId,
- LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
- SERIALIZER::encode, SERIALIZER::decode,
- master));
- }
-
- private boolean internalDestroy(DeviceId deviceId) {
- Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
- if (poolOld != null) {
- resourcePool.remove(deviceId);
- LabelResourceEvent event = new LabelResourceEvent(Type.POOL_DESTROYED,
- poolOld.value());
- notifyDelegate(event);
- }
- log.info("success to destroy the label resource pool of device id {}",
- deviceId);
- return true;
- }
-
- @Override
- public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
- long applyNum) {
- Device device = (Device) deviceService.getDevice(deviceId);
- if (device == null) {
- return Collections.emptyList();
- }
- LabelResourceRequest request = new LabelResourceRequest(deviceId,
- LabelResourceRequest.Type.APPLY,
- applyNum, null);
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
- return Collections.emptyList();
- }
-
- if (master.equals(clusterService.getLocalNode().id())) {
- return internalApply(request);
- }
-
- log.trace("Forwarding request to {}, which is the primary (master) for device {}",
- master, deviceId);
-
- return complete(clusterCommunicator
- .sendAndReceive(request,
- LabelResourceMessageSubjects.LABEL_POOL_APPLY,
- SERIALIZER::encode, SERIALIZER::decode,
- master));
- }
-
- private Collection<LabelResource> internalApply(LabelResourceRequest request) {
- DeviceId deviceId = request.deviceId();
- long applyNum = request.applyNum();
- Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
- LabelResourcePool pool = poolOld.value();
- Collection<LabelResource> result = new HashSet<LabelResource>();
- long freeNum = this.getFreeNumOfDevicePool(deviceId);
- if (applyNum > freeNum) {
- log.info("the free number of the label resource pool of deviceId {} is not enough.");
- return Collections.emptyList();
- }
- Set<LabelResource> releaseLabels = new HashSet<LabelResource>(pool.releaseLabelId());
- long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
- .size();
- LabelResource resource = null;
- for (int i = 0; i < tmp; i++) {
- Iterator<LabelResource> it = releaseLabels.iterator();
- if (it.hasNext()) {
- resource = it.next();
- releaseLabels.remove(resource);
- }
- result.add(resource);
- }
- for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
- .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
- resource = new DefaultLabelResource(deviceId,
- LabelResourceId
- .labelResourceId(j));
- result.add(resource);
- }
- long beginLabel = pool.beginLabel().labelId();
- long endLabel = pool.endLabel().labelId();
- long totalNum = pool.totalNum();
- long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
- long usedNum = pool.usedNum() + applyNum;
- ImmutableSet<LabelResource> freeLabel = ImmutableSet
- .copyOf(releaseLabels);
- LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
- beginLabel, endLabel,
- totalNum, usedNum,
- current, freeLabel);
- resourcePool.put(deviceId, newPool);
- log.info("success to apply label resource");
- return result;
- }
-
- @Override
- public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
- Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
- Set<DeviceId> deviceIdSet = maps.keySet();
- LabelResourceRequest request = null;
- for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
- DeviceId deviceId = (DeviceId) it.next();
- Device device = (Device) deviceService.getDevice(deviceId);
- if (device == null) {
- continue;
- }
- ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
- .get(deviceId));
- request = new LabelResourceRequest(deviceId,
- LabelResourceRequest.Type.RELEASE,
- 0, collection);
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
- return false;
- }
-
- if (master.equals(clusterService.getLocalNode().id())) {
- return internalRelease(request);
- }
-
- log.trace("Forwarding request to {}, which is the primary (master) for device {}",
- master, deviceId);
-
- return complete(clusterCommunicator
- .sendAndReceive(request,
- LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
- SERIALIZER::encode, SERIALIZER::decode,
- master));
- }
- return false;
- }
-
- private boolean internalRelease(LabelResourceRequest request) {
- DeviceId deviceId = request.deviceId();
- Collection<LabelResource> release = request.releaseCollection();
- Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
- LabelResourcePool pool = poolOld.value();
- if (pool == null) {
- log.info("the label resource pool of device id {} does not exist");
- return false;
- }
- Set<LabelResource> storeSet = new HashSet<LabelResource>(pool.releaseLabelId());
- LabelResource labelResource = null;
- long realReleasedNum = 0;
- for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
- labelResource = it.next();
- if (labelResource.labelResourceId().labelId() < pool.beginLabel()
- .labelId()
- || labelResource.labelResourceId().labelId() > pool
- .endLabel().labelId()) {
- continue;
- }
- if (pool.currentUsedMaxLabelId().labelId() > labelResource
- .labelResourceId().labelId()
- || !storeSet.contains(labelResource)) {
- storeSet.add(labelResource);
- realReleasedNum++;
- }
- }
- long beginNum = pool.beginLabel().labelId();
- long endNum = pool.endLabel().labelId();
- long totalNum = pool.totalNum();
- long usedNum = pool.usedNum() - realReleasedNum;
- long current = pool.currentUsedMaxLabelId().labelId();
- ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
- LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
- beginNum, endNum,
- totalNum, usedNum,
- current, s);
- resourcePool.put(deviceId, newPool);
- log.info("success to release label resource");
- return true;
- }
-
- @Override
- public boolean isDevicePoolFull(DeviceId deviceId) {
- Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
- if (pool == null) {
- return true;
- }
- return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
- && pool.value().releaseLabelId().size() == 0 ? true : false;
- }
-
- @Override
- public long getFreeNumOfDevicePool(DeviceId deviceId) {
- Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
- if (pool == null) {
- return 0;
- }
- return pool.value().endLabel().labelId()
- - pool.value().currentUsedMaxLabelId().labelId()
- + pool.value().releaseLabelId().size();
- }
-
- @Override
- public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
- Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
- return pool == null ? null : pool.value();
- }
-
- @Override
- public boolean destroyGlobalPool() {
- return this.internalDestroy(DeviceId
- .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
- }
-
- @Override
- public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
- LabelResourceRequest request = new LabelResourceRequest(DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
- LabelResourceRequest.Type.APPLY,
- applyNum, null);
- return this.internalApply(request);
- }
-
- @Override
- public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
- Set<LabelResource> set = new HashSet<LabelResource>();
- DefaultLabelResource resource = null;
- for (LabelResourceId labelResource : release) {
- resource = new DefaultLabelResource(DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
- labelResource);
- set.add(resource);
- }
- LabelResourceRequest request = new LabelResourceRequest(DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
- LabelResourceRequest.Type.APPLY,
- 0,
- ImmutableSet.copyOf(set));
- return this.internalRelease(request);
- }
-
- @Override
- public boolean isGlobalPoolFull() {
- return this.isDevicePoolFull(DeviceId
- .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
- }
-
- @Override
- public long getFreeNumOfGlobalPool() {
- return this.getFreeNumOfDevicePool(DeviceId
- .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
- }
-
- @Override
- public LabelResourcePool getGlobalLabelResourcePool() {
- return this.getDeviceLabelResourcePool(DeviceId
- .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
- }
-
- private <T> T complete(Future<T> future) {
- try {
- return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Interrupted while waiting for operation to complete.", e);
- return null;
- } catch (TimeoutException | ExecutionException e) {
- log.error("Failed remote operation", e);
- return null;
- }
- }
-}