diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:15:21 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:15:21 -0700 |
commit | 13d05bc8458758ee39cb829098241e89616717ee (patch) | |
tree | 22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/incubator/store | |
parent | 6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff) |
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/incubator/store')
9 files changed, 1551 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/store/pom.xml b/framework/src/onos/incubator/store/pom.xml new file mode 100644 index 00000000..54575582 --- /dev/null +++ b/framework/src/onos/incubator/store/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + ~ Copyright 2015 Open Networking Laboratory + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onosproject</groupId> + <artifactId>onos-incubator</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>onos-incubator-store</artifactId> + <packaging>bundle</packaging> + + <description>ONOS incubating distributed store subsystems</description> + + <dependencies> + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onos-incubator-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onos-incubator-api</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onos-core-dist</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava-testlib</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onos-api</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.felix</groupId> + <artifactId>org.apache.felix.scr.annotations</artifactId> + </dependency> + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-scr-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java new file mode 100644 index 00000000..2755a98f --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Incubating distributed store implementations. + */ +package org.onosproject.incubator.store.impl;
\ No newline at end of file diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java new file mode 100644 index 00000000..32890cb1 --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java @@ -0,0 +1,258 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.store.meter.impl; + +import com.google.common.collect.Collections2; +import com.google.common.collect.Maps; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.meter.Band; +import org.onosproject.net.meter.DefaultBand; +import org.onosproject.net.meter.DefaultMeter; +import org.onosproject.net.meter.Meter; +import org.onosproject.net.meter.MeterEvent; +import org.onosproject.net.meter.MeterFailReason; +import org.onosproject.net.meter.MeterId; +import org.onosproject.net.meter.MeterOperation; +import org.onosproject.net.meter.MeterState; +import org.onosproject.net.meter.MeterStore; +import org.onosproject.net.meter.MeterStoreDelegate; +import org.onosproject.net.meter.MeterStoreResult; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageException; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * A distributed meter store implementation. Meters are stored consistently + * across the cluster. + */ +@Component(immediate = true) +@Service +public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate> + implements MeterStore { + + private Logger log = getLogger(getClass()); + + private static final String METERSTORE = "onos-meter-store"; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private ClusterService clusterService; + + private ConsistentMap<MeterId, MeterData> meters; + private NodeId local; + + private MapEventListener mapListener = new InternalMapEventListener(); + + private Map<MeterId, CompletableFuture<MeterStoreResult>> futures = + Maps.newConcurrentMap(); + + @Activate + public void activate() { + + local = clusterService.getLocalNode().id(); + + + meters = storageService.<MeterId, MeterData>consistentMapBuilder() + .withName(METERSTORE) + .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), + MeterData.class, + DefaultMeter.class, + DefaultBand.class, + Band.Type.class, + MeterState.class, + Meter.Unit.class, + MeterFailReason.class, + MeterId.class)).build(); + + meters.addListener(mapListener); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + + meters.removeListener(mapListener); + log.info("Stopped"); + } + + + @Override + public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) { + CompletableFuture<MeterStoreResult> future = new CompletableFuture<>(); + futures.put(meter.id(), future); + MeterData data = new MeterData(meter, null, local); + + try { + meters.put(meter.id(), data); + } catch (StorageException e) { + future.completeExceptionally(e); + } + + return future; + + } + + @Override + public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) { + CompletableFuture<MeterStoreResult> future = new CompletableFuture<>(); + futures.put(meter.id(), future); + + MeterData data = new MeterData(meter, null, local); + + // update the state of the meter. It will be pruned by observing + // that it has been removed from the dataplane. + try { + if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) { + future.complete(MeterStoreResult.success()); + } + } catch (StorageException e) { + future.completeExceptionally(e); + } + + + return future; + } + + @Override + public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) { + CompletableFuture<MeterStoreResult> future = new CompletableFuture<>(); + futures.put(meter.id(), future); + + MeterData data = new MeterData(meter, null, local); + try { + if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) { + future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER)); + } + } catch (StorageException e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public void updateMeterState(Meter meter) { + meters.computeIfPresent(meter.id(), (id, v) -> { + DefaultMeter m = (DefaultMeter) v.meter(); + m.setState(meter.state()); + m.setProcessedPackets(meter.packetsSeen()); + m.setProcessedBytes(meter.bytesSeen()); + m.setLife(meter.life()); + // TODO: Prune if drops to zero. + m.setReferenceCount(meter.referenceCount()); + return new MeterData(m, null, v.origin()); + }); + } + + @Override + public Meter getMeter(MeterId meterId) { + MeterData data = Versioned.valueOrElse(meters.get(meterId), null); + return data == null ? null : data.meter(); + } + + @Override + public Collection<Meter> getAllMeters() { + return Collections2.transform(meters.asJavaMap().values(), + MeterData::meter); + } + + @Override + public void failedMeter(MeterOperation op, MeterFailReason reason) { + meters.computeIfPresent(op.meter().id(), (k, v) -> + new MeterData(v.meter(), reason, v.origin())); + } + + @Override + public void deleteMeterNow(Meter m) { + futures.remove(m.id()); + meters.remove(m.id()); + } + + private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> { + @Override + public void event(MapEvent<MeterId, MeterData> event) { + MeterData data = event.value().value(); + NodeId master = mastershipService.getMasterFor(data.meter().deviceId()); + switch (event.type()) { + case INSERT: + case UPDATE: + switch (data.meter().state()) { + case PENDING_ADD: + case PENDING_REMOVE: + if (!data.reason().isPresent() && local.equals(master)) { + notifyDelegate( + new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ? + MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, + data.meter())); + } else if (data.reason().isPresent() && local.equals(data.origin())) { + MeterStoreResult msr = MeterStoreResult.fail(data.reason().get()); + //TODO: No future -> no friend + futures.get(data.meter().id()).complete(msr); + } + break; + case ADDED: + if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) { + futures.remove(data.meter().id()).complete(MeterStoreResult.success()); + } + break; + case REMOVED: + if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) { + futures.remove(data.meter().id()).complete(MeterStoreResult.success()); + } + break; + default: + log.warn("Unknown meter state type {}", data.meter().state()); + } + break; + case REMOVE: + //Only happens at origin so we do not need to care. + break; + default: + log.warn("Unknown Map event type {}", event.type()); + } + + } + } + + +} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java new file mode 100644 index 00000000..c72bc2e3 --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java @@ -0,0 +1,52 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.store.meter.impl; + +import org.onosproject.cluster.NodeId; +import org.onosproject.net.meter.Meter; +import org.onosproject.net.meter.MeterFailReason; + +import java.util.Optional; + +/** + * A class representing the meter information stored in the meter store. + */ +public class MeterData { + + private final Meter meter; + private final Optional<MeterFailReason> reason; + private final NodeId origin; + + public MeterData(Meter meter, MeterFailReason reason, NodeId origin) { + this.meter = meter; + this.reason = Optional.ofNullable(reason); + this.origin = origin; + } + + public Meter meter() { + return meter; + } + + public Optional<MeterFailReason> reason() { + return this.reason; + } + + public NodeId origin() { + return this.origin; + } + + +} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java new file mode 100644 index 00000000..a014504c --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java @@ -0,0 +1,543 @@ +package org.onosproject.incubator.store.resource.impl; + +import static org.onlab.util.Tools.groupedThreads; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.incubator.net.resource.label.DefaultLabelResource; +import org.onosproject.incubator.net.resource.label.LabelResource; +import org.onosproject.incubator.net.resource.label.LabelResourceDelegate; +import org.onosproject.incubator.net.resource.label.LabelResourceEvent; +import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type; +import org.onosproject.incubator.net.resource.label.LabelResourceId; +import org.onosproject.incubator.net.resource.label.LabelResourcePool; +import org.onosproject.incubator.net.resource.label.LabelResourceRequest; +import org.onosproject.incubator.net.resource.label.LabelResourceStore; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.Device; +import org.onosproject.net.DeviceId; +import org.onosproject.net.device.DeviceService; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterMessage; +import org.onosproject.store.cluster.messaging.ClusterMessageHandler; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; + +/** + * Manages label resources using copycat. + */ +@Component(immediate = true, enabled = true) +@Service +public class DistributedLabelResourceStore + extends AbstractStore<LabelResourceEvent, LabelResourceDelegate> + implements LabelResourceStore { + private final Logger log = getLogger(getClass()); + + private static final String POOL_MAP_NAME = "labelresourcepool"; + + private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id"; + + private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + + private ExecutorService messageHandlingExecutor; + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8; + private static final long PEER_REQUEST_TIMEOUT_MS = 5000; + + private static final Serializer SERIALIZER = Serializer + .using(new KryoNamespace.Builder().register(KryoNamespaces.API) + .register(LabelResourceEvent.class) + .register(LabelResourcePool.class).register(DeviceId.class) + .register(LabelResourceRequest.class) + .register(LabelResourceRequest.Type.class) + .register(LabelResourceEvent.Type.class) + .register(DefaultLabelResource.class) + .register(LabelResourceId.class) + .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build()); + + @Activate + public void activate() { + + resourcePool = storageService + .<DeviceId, LabelResourcePool>consistentMapBuilder() + .withName(POOL_MAP_NAME).withSerializer(SERIALIZER) + .withPartitionsDisabled().build(); + messageHandlingExecutor = Executors + .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, + groupedThreads("onos/store/flow", + "message-handlers")); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + LabelResourcePool operation = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + operation); + boolean b = internalCreate(operation); + message.respond(SERIALIZER.encode(b)); + } + }, messageHandlingExecutor); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + DeviceId deviceId = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + deviceId); + boolean b = internalDestroy(deviceId); + message.respond(SERIALIZER.encode(b)); + } + }, messageHandlingExecutor); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + LabelResourceRequest request = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + request); + final Collection<LabelResource> resource = internalApply(request); + message.respond(SERIALIZER + .encode(resource)); + } + }, messageHandlingExecutor); + clusterCommunicator + .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE, + new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + LabelResourceRequest request = SERIALIZER + .decode(message.payload()); + log.trace("received get flow entry request for {}", + request); + final boolean isSuccess = internalRelease(request); + message.respond(SERIALIZER + .encode(isSuccess)); + } + }, messageHandlingExecutor); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED); + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY); + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED); + clusterCommunicator + .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE); + messageHandlingExecutor.shutdown(); + log.info("Stopped"); + } + + @Override + public boolean createDevicePool(DeviceId deviceId, + LabelResourceId beginLabel, + LabelResourceId endLabel) { + LabelResourcePool pool = new LabelResourcePool(deviceId.toString(), + beginLabel.labelId(), + endLabel.labelId()); + return this.create(pool); + } + + @Override + public boolean createGlobalPool(LabelResourceId beginLabel, + LabelResourceId endLabel) { + LabelResourcePool pool = new LabelResourcePool( + GLOBAL_RESOURCE_POOL_DEVICE_ID, + beginLabel.labelId(), + endLabel.labelId()); + return this.internalCreate(pool); + } + + private boolean create(LabelResourcePool pool) { + Device device = (Device) deviceService.getDevice(pool.deviceId()); + if (device == null) { + return false; + } + + NodeId master = mastershipService.getMasterFor(pool.deviceId()); + + if (master == null) { + log.warn("Failed to create label resource pool: No master for {}", pool); + return false; + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalCreate(pool); + } + + log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", + master, pool.deviceId()); + + return complete(clusterCommunicator + .sendAndReceive(pool, + LabelResourceMessageSubjects.LABEL_POOL_CREATED, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + + private boolean internalCreate(LabelResourcePool pool) { + Versioned<LabelResourcePool> poolOld = resourcePool + .get(pool.deviceId()); + if (poolOld == null) { + resourcePool.put(pool.deviceId(), pool); + LabelResourceEvent event = new LabelResourceEvent( + Type.POOL_CREATED, + pool); + notifyDelegate(event); + return true; + } + return false; + } + + @Override + public boolean destroyDevicePool(DeviceId deviceId) { + Device device = (Device) deviceService.getDevice(deviceId); + if (device == null) { + return false; + } + + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.warn("Failed to destroyDevicePool. No master for {}", deviceId); + return false; + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalDestroy(deviceId); + } + + log.trace("Forwarding request to {}, which is the primary (master) for device {}", + master, deviceId); + + return complete(clusterCommunicator + .sendAndReceive(deviceId, + LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + + private boolean internalDestroy(DeviceId deviceId) { + Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); + if (poolOld != null) { + resourcePool.remove(deviceId); + LabelResourceEvent event = new LabelResourceEvent( + Type.POOL_CREATED, + poolOld.value()); + notifyDelegate(event); + } + log.info("success to destroy the label resource pool of device id {}", + deviceId); + return true; + } + + @Override + public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId, + long applyNum) { + Device device = (Device) deviceService.getDevice(deviceId); + if (device == null) { + return Collections.emptyList(); + } + LabelResourceRequest request = new LabelResourceRequest( + deviceId, + LabelResourceRequest.Type.APPLY, + applyNum, null); + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.warn("Failed to applyFromDevicePool: No master for {}", deviceId); + return Collections.emptyList(); + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalApply(request); + } + + log.trace("Forwarding request to {}, which is the primary (master) for device {}", + master, deviceId); + + return complete(clusterCommunicator + .sendAndReceive(request, + LabelResourceMessageSubjects.LABEL_POOL_APPLY, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + + private Collection<LabelResource> internalApply(LabelResourceRequest request) { + DeviceId deviceId = request.deviceId(); + long applyNum = request.applyNum(); + Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); + LabelResourcePool pool = poolOld.value(); + Collection<LabelResource> result = new HashSet<LabelResource>(); + long freeNum = this.getFreeNumOfDevicePool(deviceId); + if (applyNum > freeNum) { + log.info("the free number of the label resource pool of deviceId {} is not enough."); + return Collections.emptyList(); + } + Set<LabelResource> releaseLabels = new HashSet<LabelResource>( + pool.releaseLabelId()); + long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels + .size(); + LabelResource resource = null; + for (int i = 0; i < tmp; i++) { + Iterator<LabelResource> it = releaseLabels.iterator(); + if (it.hasNext()) { + resource = it.next(); + releaseLabels.remove(resource); + } + result.add(resource); + } + for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool + .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) { + resource = new DefaultLabelResource(deviceId, + LabelResourceId + .labelResourceId(j)); + result.add(resource); + } + long beginLabel = pool.beginLabel().labelId(); + long endLabel = pool.endLabel().labelId(); + long totalNum = pool.totalNum(); + long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp; + long usedNum = pool.usedNum() + applyNum; + ImmutableSet<LabelResource> freeLabel = ImmutableSet + .copyOf(releaseLabels); + LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(), + beginLabel, endLabel, + totalNum, usedNum, + current, freeLabel); + resourcePool.put(deviceId, newPool); + log.info("success to apply label resource"); + return result; + } + + @Override + public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) { + Map<DeviceId, Collection<LabelResource>> maps = release.asMap(); + Set<DeviceId> deviceIdSet = maps.keySet(); + LabelResourceRequest request = null; + for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) { + DeviceId deviceId = (DeviceId) it.next(); + Device device = (Device) deviceService.getDevice(deviceId); + if (device == null) { + continue; + } + ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps + .get(deviceId)); + request = new LabelResourceRequest( + deviceId, + LabelResourceRequest.Type.RELEASE, + 0, collection); + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.warn("Failed to releaseToDevicePool: No master for {}", deviceId); + return false; + } + + if (master.equals(clusterService.getLocalNode().id())) { + return internalRelease(request); + } + + log.trace("Forwarding request to {}, which is the primary (master) for device {}", + master, deviceId); + + return complete(clusterCommunicator + .sendAndReceive(request, + LabelResourceMessageSubjects.LABEL_POOL_RELEASE, + SERIALIZER::encode, SERIALIZER::decode, + master)); + } + return false; + } + + private boolean internalRelease(LabelResourceRequest request) { + DeviceId deviceId = request.deviceId(); + Collection<LabelResource> release = request.releaseCollection(); + Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId); + LabelResourcePool pool = poolOld.value(); + if (pool == null) { + log.info("the label resource pool of device id {} does not exist"); + return false; + } + Set<LabelResource> storeSet = new HashSet<LabelResource>( + pool.releaseLabelId()); + LabelResource labelResource = null; + long realReleasedNum = 0; + for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) { + labelResource = it.next(); + if (labelResource.labelResourceId().labelId() < pool.beginLabel() + .labelId() + || labelResource.labelResourceId().labelId() > pool + .endLabel().labelId()) { + continue; + } + if (pool.currentUsedMaxLabelId().labelId() > labelResource + .labelResourceId().labelId() + || !storeSet.contains(labelResource)) { + storeSet.add(labelResource); + realReleasedNum++; + } + } + long beginNum = pool.beginLabel().labelId(); + long endNum = pool.endLabel().labelId(); + long totalNum = pool.totalNum(); + long usedNum = pool.usedNum() - realReleasedNum; + long current = pool.currentUsedMaxLabelId().labelId(); + ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet); + LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(), + beginNum, endNum, + totalNum, usedNum, + current, s); + resourcePool.put(deviceId, newPool); + log.info("success to release label resource"); + return true; + } + + @Override + public boolean isDevicePoolFull(DeviceId deviceId) { + Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); + if (pool == null) { + return true; + } + return pool.value().currentUsedMaxLabelId() == pool.value().endLabel() + && pool.value().releaseLabelId().size() == 0 ? true : false; + } + + @Override + public long getFreeNumOfDevicePool(DeviceId deviceId) { + Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); + if (pool == null) { + return 0; + } + return pool.value().endLabel().labelId() + - pool.value().currentUsedMaxLabelId().labelId() + + pool.value().releaseLabelId().size(); + } + + @Override + public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) { + Versioned<LabelResourcePool> pool = resourcePool.get(deviceId); + return pool == null ? null : pool.value(); + } + + @Override + public boolean destroyGlobalPool() { + return this.internalDestroy(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + @Override + public Collection<LabelResource> applyFromGlobalPool(long applyNum) { + LabelResourceRequest request = new LabelResourceRequest( + DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), + LabelResourceRequest.Type.APPLY, + applyNum, null); + return this.internalApply(request); + } + + @Override + public boolean releaseToGlobalPool(Set<LabelResourceId> release) { + Set<LabelResource> set = new HashSet<LabelResource>(); + DefaultLabelResource resource = null; + for (LabelResourceId labelResource : release) { + resource = new DefaultLabelResource( + DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), + labelResource); + set.add(resource); + } + LabelResourceRequest request = new LabelResourceRequest( + DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID), + LabelResourceRequest.Type.APPLY, + 0, + ImmutableSet + .copyOf(set)); + return this.internalRelease(request); + } + + @Override + public boolean isGlobalPoolFull() { + return this.isDevicePoolFull(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + @Override + public long getFreeNumOfGlobalPool() { + return this.getFreeNumOfDevicePool(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + @Override + public LabelResourcePool getGlobalLabelResourcePool() { + return this.getDeviceLabelResourcePool(DeviceId + .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID)); + } + + private <T> T complete(Future<T> future) { + try { + return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Interrupted while waiting for operation to complete.", e); + return null; + } catch (TimeoutException | ExecutionException e) { + log.error("Failed remote operation", e); + return null; + } + } +} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java new file mode 100644 index 00000000..0a6f1640 --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java @@ -0,0 +1,17 @@ +package org.onosproject.incubator.store.resource.impl; + +import org.onosproject.store.cluster.messaging.MessageSubject; + +public final class LabelResourceMessageSubjects { + + private LabelResourceMessageSubjects() { + } + public static final MessageSubject LABEL_POOL_CREATED + = new MessageSubject("label-resource-pool-created"); + public static final MessageSubject LABEL_POOL_DESTROYED + = new MessageSubject("label-resource-pool-destroyed"); + public static final MessageSubject LABEL_POOL_APPLY + = new MessageSubject("label-resource-pool-apply"); + public static final MessageSubject LABEL_POOL_RELEASE + = new MessageSubject("label-resource-pool-release"); +} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java new file mode 100644 index 00000000..da0dc2e5 --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the label resource distributed store. + */ +package org.onosproject.incubator.store.resource.impl;
\ No newline at end of file diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java new file mode 100644 index 00000000..78c6468e --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java @@ -0,0 +1,532 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.incubator.store.tunnel.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.CoreService; +import org.onosproject.core.IdGenerator; +import org.onosproject.incubator.net.tunnel.DefaultTunnel; +import org.onosproject.incubator.net.tunnel.Tunnel; +import org.onosproject.incubator.net.tunnel.Tunnel.Type; +import org.onosproject.incubator.net.tunnel.TunnelEndPoint; +import org.onosproject.incubator.net.tunnel.TunnelEvent; +import org.onosproject.incubator.net.tunnel.TunnelId; +import org.onosproject.incubator.net.tunnel.TunnelName; +import org.onosproject.incubator.net.tunnel.TunnelStore; +import org.onosproject.incubator.net.tunnel.TunnelStoreDelegate; +import org.onosproject.incubator.net.tunnel.TunnelSubscription; +import org.onosproject.net.Annotations; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.app.GossipApplicationStore.InternalState; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.MultiValuedTimestamp; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.WallClockTimestamp; +import org.slf4j.Logger; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableSet; + +/** + * Manages inventory of tunnel in distributed data store that uses optimistic + * replication and gossip based techniques. + */ +@Component(immediate = true) +@Service +public class DistributedTunnelStore + extends AbstractStore<TunnelEvent, TunnelStoreDelegate> + implements TunnelStore { + + private final Logger log = getLogger(getClass()); + + /** + * The topic used for obtaining globally unique ids. + */ + private String runnelOpTopoic = "tunnel-ops-ids"; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected CoreService coreService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + // tunnel identity as map key in the store. + private EventuallyConsistentMap<TunnelId, Tunnel> tunnelIdAsKeyStore; + // tunnel name as map key in the store. + private EventuallyConsistentMap<TunnelName, Set<TunnelId>> tunnelNameAsKeyStore; + // maintains all the tunnels between source and destination. + private EventuallyConsistentMap<TunnelKey, Set<TunnelId>> srcAndDstKeyStore; + // maintains all the tunnels by tunnel type. + private EventuallyConsistentMap<Tunnel.Type, Set<TunnelId>> typeKeyStore; + // maintains records that app subscribes tunnel. + private EventuallyConsistentMap<ApplicationId, Set<TunnelSubscription>> orderRelationship; + + private IdGenerator idGenerator; + + @Activate + public void activate() { + KryoNamespace.Builder serializer = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(MultiValuedTimestamp.class) + .register(InternalState.class); + tunnelIdAsKeyStore = storageService + .<TunnelId, Tunnel>eventuallyConsistentMapBuilder() + .withName("all_tunnel").withSerializer(serializer) + .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); + tunnelNameAsKeyStore = storageService + .<TunnelName, Set<TunnelId>>eventuallyConsistentMapBuilder() + .withName("tunnel_name_tunnel").withSerializer(serializer) + .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); + srcAndDstKeyStore = storageService + .<TunnelKey, Set<TunnelId>>eventuallyConsistentMapBuilder() + .withName("src_dst_tunnel").withSerializer(serializer) + .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); + typeKeyStore = storageService + .<Tunnel.Type, Set<TunnelId>>eventuallyConsistentMapBuilder() + .withName("type_tunnel").withSerializer(serializer) + .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); + orderRelationship = storageService + .<ApplicationId, Set<TunnelSubscription>>eventuallyConsistentMapBuilder() + .withName("type_tunnel").withSerializer(serializer) + .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); + idGenerator = coreService.getIdGenerator(runnelOpTopoic); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + orderRelationship.destroy(); + tunnelIdAsKeyStore.destroy(); + srcAndDstKeyStore.destroy(); + typeKeyStore.destroy(); + tunnelNameAsKeyStore.destroy(); + log.info("Stopped"); + } + + @Override + public TunnelId createOrUpdateTunnel(Tunnel tunnel) { + // tunnelIdAsKeyStore. + if (tunnel.tunnelId() != null && !"".equals(tunnel.tunnelId())) { + Tunnel old = tunnelIdAsKeyStore.get(tunnel.tunnelId()); + if (old == null) { + log.info("This tunnel[" + tunnel.tunnelId() + "] is not available."); + return tunnel.tunnelId(); + } + DefaultAnnotations oldAnno = (DefaultAnnotations) old.annotations(); + SparseAnnotations newAnno = (SparseAnnotations) tunnel.annotations(); + Tunnel newT = new DefaultTunnel(old.providerId(), old.src(), + old.dst(), old.type(), + old.state(), old.groupId(), + old.tunnelId(), + old.tunnelName(), + old.path(), + DefaultAnnotations.merge(oldAnno, newAnno)); + tunnelIdAsKeyStore.put(tunnel.tunnelId(), newT); + TunnelEvent event = new TunnelEvent( + TunnelEvent.Type.TUNNEL_UPDATED, + tunnel); + notifyDelegate(event); + return tunnel.tunnelId(); + } else { + TunnelId tunnelId = TunnelId.valueOf(idGenerator.getNewId()); + Tunnel newT = new DefaultTunnel(tunnel.providerId(), tunnel.src(), + tunnel.dst(), tunnel.type(), + tunnel.state(), tunnel.groupId(), + tunnelId, + tunnel.tunnelName(), + tunnel.path(), + tunnel.annotations()); + TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst()); + tunnelIdAsKeyStore.put(tunnelId, newT); + Set<TunnelId> tunnelnameSet = tunnelNameAsKeyStore.get(tunnel + .tunnelName()); + if (tunnelnameSet == null) { + tunnelnameSet = new HashSet<TunnelId>(); + } + tunnelnameSet.add(tunnelId); + tunnelNameAsKeyStore.put(tunnel + .tunnelName(), tunnelnameSet); + Set<TunnelId> srcAndDstKeySet = srcAndDstKeyStore.get(key); + if (srcAndDstKeySet == null) { + srcAndDstKeySet = new HashSet<TunnelId>(); + } + srcAndDstKeySet.add(tunnelId); + srcAndDstKeyStore.put(key, srcAndDstKeySet); + Set<TunnelId> typeKeySet = typeKeyStore.get(tunnel.type()); + if (typeKeySet == null) { + typeKeySet = new HashSet<TunnelId>(); + } + typeKeySet.add(tunnelId); + typeKeyStore.put(tunnel.type(), typeKeySet); + TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_ADDED, + tunnel); + notifyDelegate(event); + return tunnelId; + } + } + + @Override + public void deleteTunnel(TunnelId tunnelId) { + Tunnel deletedTunnel = tunnelIdAsKeyStore.get(tunnelId); + if (deletedTunnel == null) { + return; + } + tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()).remove(tunnelId); + tunnelIdAsKeyStore.remove(tunnelId); + TunnelKey key = new TunnelKey(deletedTunnel.src(), deletedTunnel.dst()); + srcAndDstKeyStore.get(key).remove(tunnelId); + typeKeyStore.get(deletedTunnel.type()).remove(tunnelId); + TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, + deletedTunnel); + notifyDelegate(event); + } + + @Override + public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, + ProviderId producerName) { + TunnelKey key = TunnelKey.tunnelKey(src, dst); + Set<TunnelId> idSet = srcAndDstKeyStore.get(key); + if (idSet == null) { + return; + } + Tunnel deletedTunnel = null; + TunnelEvent event = null; + List<TunnelEvent> ls = new ArrayList<TunnelEvent>(); + for (TunnelId id : idSet) { + deletedTunnel = tunnelIdAsKeyStore.get(id); + event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, + deletedTunnel); + ls.add(event); + if (producerName.equals(deletedTunnel.providerId())) { + tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId()); + tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()) + .remove(deletedTunnel.tunnelId()); + srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId()); + typeKeyStore.get(deletedTunnel.type()) + .remove(deletedTunnel.tunnelId()); + } + } + notifyDelegate(ls); + } + + @Override + public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, Type type, + ProviderId producerName) { + TunnelKey key = TunnelKey.tunnelKey(src, dst); + Set<TunnelId> idSet = srcAndDstKeyStore.get(key); + if (idSet == null) { + return; + } + Tunnel deletedTunnel = null; + TunnelEvent event = null; + List<TunnelEvent> ls = new ArrayList<TunnelEvent>(); + for (TunnelId id : idSet) { + deletedTunnel = tunnelIdAsKeyStore.get(id); + event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, + deletedTunnel); + ls.add(event); + if (producerName.equals(deletedTunnel.providerId()) + && type.equals(deletedTunnel.type())) { + tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId()); + tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()) + .remove(deletedTunnel.tunnelId()); + srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId()); + typeKeyStore.get(deletedTunnel.type()) + .remove(deletedTunnel.tunnelId()); + } + } + notifyDelegate(ls); + } + + @Override + public Tunnel borrowTunnel(ApplicationId appId, TunnelId tunnelId, + Annotations... annotations) { + Set<TunnelSubscription> orderSet = orderRelationship.get(appId); + if (orderSet == null) { + orderSet = new HashSet<TunnelSubscription>(); + } + TunnelSubscription order = new TunnelSubscription(appId, null, null, tunnelId, null, null, + annotations); + Tunnel result = tunnelIdAsKeyStore.get(tunnelId); + if (result != null || Tunnel.State.INACTIVE.equals(result.state())) { + return null; + } + orderSet.add(order); + orderRelationship.put(appId, orderSet); + return result; + } + + @Override + public Collection<Tunnel> borrowTunnel(ApplicationId appId, + TunnelEndPoint src, + TunnelEndPoint dst, + Annotations... annotations) { + Set<TunnelSubscription> orderSet = orderRelationship.get(appId); + if (orderSet == null) { + orderSet = new HashSet<TunnelSubscription>(); + } + TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, null, null, annotations); + boolean isExist = orderSet.contains(order); + if (!isExist) { + orderSet.add(order); + } + orderRelationship.put(appId, orderSet); + TunnelKey key = TunnelKey.tunnelKey(src, dst); + Set<TunnelId> idSet = srcAndDstKeyStore.get(key); + if (idSet == null || idSet.size() == 0) { + return Collections.emptySet(); + } + Collection<Tunnel> tunnelSet = new HashSet<Tunnel>(); + for (TunnelId tunnelId : idSet) { + Tunnel result = tunnelIdAsKeyStore.get(tunnelId); + if (Tunnel.State.ACTIVE.equals(result.state())) { + tunnelSet.add(result); + } + } + return tunnelSet; + } + + @Override + public Collection<Tunnel> borrowTunnel(ApplicationId appId, + TunnelEndPoint src, + TunnelEndPoint dst, Type type, + Annotations... annotations) { + Set<TunnelSubscription> orderSet = orderRelationship.get(appId); + if (orderSet == null) { + orderSet = new HashSet<TunnelSubscription>(); + } + TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, type, null, annotations); + boolean isExist = orderSet.contains(order); + if (!isExist) { + orderSet.add(order); + } + orderRelationship.put(appId, orderSet); + TunnelKey key = TunnelKey.tunnelKey(src, dst); + Set<TunnelId> idSet = srcAndDstKeyStore.get(key); + if (idSet == null || idSet.size() == 0) { + return Collections.emptySet(); + } + Collection<Tunnel> tunnelSet = new HashSet<Tunnel>(); + for (TunnelId tunnelId : idSet) { + Tunnel result = tunnelIdAsKeyStore.get(tunnelId); + if (type.equals(result.type()) + && Tunnel.State.ACTIVE.equals(result.state())) { + tunnelSet.add(result); + } + } + return tunnelSet; + } + + @Override + public Collection<Tunnel> borrowTunnel(ApplicationId appId, + TunnelName tunnelName, + Annotations... annotations) { + Set<TunnelSubscription> orderSet = orderRelationship.get(appId); + if (orderSet == null) { + orderSet = new HashSet<TunnelSubscription>(); + } + TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName, + annotations); + boolean isExist = orderSet.contains(order); + if (!isExist) { + orderSet.add(order); + } + orderRelationship.put(appId, orderSet); + Set<TunnelId> idSet = tunnelNameAsKeyStore.get(tunnelName); + if (idSet == null || idSet.size() == 0) { + return Collections.emptySet(); + } + Collection<Tunnel> tunnelSet = new HashSet<Tunnel>(); + for (TunnelId tunnelId : idSet) { + Tunnel result = tunnelIdAsKeyStore.get(tunnelId); + if (Tunnel.State.ACTIVE.equals(result.state())) { + tunnelSet.add(result); + } + } + return tunnelSet; + } + + @Override + public boolean returnTunnel(ApplicationId appId, TunnelName tunnelName, + Annotations... annotations) { + TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName, + annotations); + return deleteOrder(order); + } + + @Override + public boolean returnTunnel(ApplicationId appId, TunnelId tunnelId, + Annotations... annotations) { + TunnelSubscription order = new TunnelSubscription(appId, null, null, tunnelId, null, null, + annotations); + return deleteOrder(order); + } + + @Override + public boolean returnTunnel(ApplicationId appId, TunnelEndPoint src, + TunnelEndPoint dst, Type type, + Annotations... annotations) { + TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, type, null, annotations); + return deleteOrder(order); + } + + @Override + public boolean returnTunnel(ApplicationId appId, TunnelEndPoint src, + TunnelEndPoint dst, Annotations... annotations) { + TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, null, null, annotations); + return deleteOrder(order); + } + + private boolean deleteOrder(TunnelSubscription order) { + Set<TunnelSubscription> orderSet = orderRelationship.get(order.consumerId()); + if (orderSet == null) { + return true; + } + if (orderSet.contains(order)) { + orderSet.remove(order); + return true; + } + return false; + } + + @Override + public Tunnel queryTunnel(TunnelId tunnelId) { + return tunnelIdAsKeyStore.get(tunnelId); + } + + @Override + public Collection<TunnelSubscription> queryTunnelSubscription(ApplicationId appId) { + return orderRelationship.get(appId) != null ? ImmutableSet.copyOf(orderRelationship + .get(appId)) : Collections.emptySet(); + } + + @Override + public Collection<Tunnel> queryTunnel(Type type) { + Collection<Tunnel> result = new HashSet<Tunnel>(); + Set<TunnelId> tunnelIds = typeKeyStore.get(type); + if (tunnelIds == null) { + return Collections.emptySet(); + } + for (TunnelId id : tunnelIds) { + result.add(tunnelIdAsKeyStore.get(id)); + } + return result.size() == 0 ? Collections.emptySet() : ImmutableSet + .copyOf(result); + } + + @Override + public Collection<Tunnel> queryTunnel(TunnelEndPoint src, TunnelEndPoint dst) { + Collection<Tunnel> result = new HashSet<Tunnel>(); + TunnelKey key = TunnelKey.tunnelKey(src, dst); + Set<TunnelId> tunnelIds = srcAndDstKeyStore.get(key); + if (tunnelIds == null) { + return Collections.emptySet(); + } + for (TunnelId id : tunnelIds) { + result.add(tunnelIdAsKeyStore.get(id)); + } + return result.size() == 0 ? Collections.emptySet() : ImmutableSet + .copyOf(result); + } + + @Override + public Collection<Tunnel> queryAllTunnels() { + return tunnelIdAsKeyStore.values(); + } + + @Override + public int tunnelCount() { + return tunnelIdAsKeyStore.size(); + } + + /** + * Uses source TunnelPoint and destination TunnelPoint as map key. + */ + private static final class TunnelKey { + private final TunnelEndPoint src; + private final TunnelEndPoint dst; + + private TunnelKey(TunnelEndPoint src, TunnelEndPoint dst) { + this.src = src; + this.dst = dst; + + } + + /** + * create a map key. + * + * @param src + * @param dst + * @return a key using source ip and destination ip + */ + static TunnelKey tunnelKey(TunnelEndPoint src, TunnelEndPoint dst) { + return new TunnelKey(src, dst); + } + + @Override + public int hashCode() { + return Objects.hash(src, dst); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof TunnelKey) { + final TunnelKey other = (TunnelKey) obj; + return Objects.equals(this.src, other.src) + && Objects.equals(this.dst, other.dst); + } + return false; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()).add("src", src) + .add("dst", dst).toString(); + } + } +} diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java new file mode 100644 index 00000000..f0c06f74 --- /dev/null +++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of distributed tunnel store using p2p synchronization protocol. + */ +package org.onosproject.incubator.store.tunnel.impl; |