aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/incubator/store
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
committerAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
commit13d05bc8458758ee39cb829098241e89616717ee (patch)
tree22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/incubator/store
parent6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff)
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/incubator/store')
-rw-r--r--framework/src/onos/incubator/store/pom.xml89
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java20
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java258
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java52
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java543
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java17
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java20
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java532
-rw-r--r--framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java20
9 files changed, 1551 insertions, 0 deletions
diff --git a/framework/src/onos/incubator/store/pom.xml b/framework/src/onos/incubator/store/pom.xml
new file mode 100644
index 00000000..54575582
--- /dev/null
+++ b/framework/src/onos/incubator/store/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Copyright 2015 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>onos-incubator-store</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>ONOS incubating distributed store subsystems</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-api</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-dist</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java
new file mode 100644
index 00000000..2755a98f
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Incubating distributed store implementations.
+ */
+package org.onosproject.incubator.store.impl; \ No newline at end of file
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
new file mode 100644
index 00000000..32890cb1
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.meter.impl;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterState;
+import org.onosproject.net.meter.MeterStore;
+import org.onosproject.net.meter.MeterStoreDelegate;
+import org.onosproject.net.meter.MeterStoreResult;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A distributed meter store implementation. Meters are stored consistently
+ * across the cluster.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
+ implements MeterStore {
+
+ private Logger log = getLogger(getClass());
+
+ private static final String METERSTORE = "onos-meter-store";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ private ConsistentMap<MeterId, MeterData> meters;
+ private NodeId local;
+
+ private MapEventListener mapListener = new InternalMapEventListener();
+
+ private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
+ Maps.newConcurrentMap();
+
+ @Activate
+ public void activate() {
+
+ local = clusterService.getLocalNode().id();
+
+
+ meters = storageService.<MeterId, MeterData>consistentMapBuilder()
+ .withName(METERSTORE)
+ .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ MeterData.class,
+ DefaultMeter.class,
+ DefaultBand.class,
+ Band.Type.class,
+ MeterState.class,
+ Meter.Unit.class,
+ MeterFailReason.class,
+ MeterId.class)).build();
+
+ meters.addListener(mapListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+
+ meters.removeListener(mapListener);
+ log.info("Stopped");
+ }
+
+
+ @Override
+ public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+ MeterData data = new MeterData(meter, null, local);
+
+ try {
+ meters.put(meter.id(), data);
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
+ }
+
+ return future;
+
+ }
+
+ @Override
+ public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+
+ MeterData data = new MeterData(meter, null, local);
+
+ // update the state of the meter. It will be pruned by observing
+ // that it has been removed from the dataplane.
+ try {
+ if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+ future.complete(MeterStoreResult.success());
+ }
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
+ }
+
+
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+
+ MeterData data = new MeterData(meter, null, local);
+ try {
+ if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+ future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
+ }
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void updateMeterState(Meter meter) {
+ meters.computeIfPresent(meter.id(), (id, v) -> {
+ DefaultMeter m = (DefaultMeter) v.meter();
+ m.setState(meter.state());
+ m.setProcessedPackets(meter.packetsSeen());
+ m.setProcessedBytes(meter.bytesSeen());
+ m.setLife(meter.life());
+ // TODO: Prune if drops to zero.
+ m.setReferenceCount(meter.referenceCount());
+ return new MeterData(m, null, v.origin());
+ });
+ }
+
+ @Override
+ public Meter getMeter(MeterId meterId) {
+ MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
+ return data == null ? null : data.meter();
+ }
+
+ @Override
+ public Collection<Meter> getAllMeters() {
+ return Collections2.transform(meters.asJavaMap().values(),
+ MeterData::meter);
+ }
+
+ @Override
+ public void failedMeter(MeterOperation op, MeterFailReason reason) {
+ meters.computeIfPresent(op.meter().id(), (k, v) ->
+ new MeterData(v.meter(), reason, v.origin()));
+ }
+
+ @Override
+ public void deleteMeterNow(Meter m) {
+ futures.remove(m.id());
+ meters.remove(m.id());
+ }
+
+ private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
+ @Override
+ public void event(MapEvent<MeterId, MeterData> event) {
+ MeterData data = event.value().value();
+ NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ switch (data.meter().state()) {
+ case PENDING_ADD:
+ case PENDING_REMOVE:
+ if (!data.reason().isPresent() && local.equals(master)) {
+ notifyDelegate(
+ new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
+ MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
+ data.meter()));
+ } else if (data.reason().isPresent() && local.equals(data.origin())) {
+ MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
+ //TODO: No future -> no friend
+ futures.get(data.meter().id()).complete(msr);
+ }
+ break;
+ case ADDED:
+ if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) {
+ futures.remove(data.meter().id()).complete(MeterStoreResult.success());
+ }
+ break;
+ case REMOVED:
+ if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
+ futures.remove(data.meter().id()).complete(MeterStoreResult.success());
+ }
+ break;
+ default:
+ log.warn("Unknown meter state type {}", data.meter().state());
+ }
+ break;
+ case REMOVE:
+ //Only happens at origin so we do not need to care.
+ break;
+ default:
+ log.warn("Unknown Map event type {}", event.type());
+ }
+
+ }
+ }
+
+
+}
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java
new file mode 100644
index 00000000..c72bc2e3
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/MeterData.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.meter.impl;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterFailReason;
+
+import java.util.Optional;
+
+/**
+ * A class representing the meter information stored in the meter store.
+ */
+public class MeterData {
+
+ private final Meter meter;
+ private final Optional<MeterFailReason> reason;
+ private final NodeId origin;
+
+ public MeterData(Meter meter, MeterFailReason reason, NodeId origin) {
+ this.meter = meter;
+ this.reason = Optional.ofNullable(reason);
+ this.origin = origin;
+ }
+
+ public Meter meter() {
+ return meter;
+ }
+
+ public Optional<MeterFailReason> reason() {
+ return this.reason;
+ }
+
+ public NodeId origin() {
+ return this.origin;
+ }
+
+
+}
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
new file mode 100644
index 00000000..a014504c
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
@@ -0,0 +1,543 @@
+package org.onosproject.incubator.store.resource.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
+import org.onosproject.incubator.net.resource.label.LabelResource;
+import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
+import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
+import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
+import org.onosproject.incubator.net.resource.label.LabelResourceId;
+import org.onosproject.incubator.net.resource.label.LabelResourcePool;
+import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
+import org.onosproject.incubator.net.resource.label.LabelResourceStore;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+/**
+ * Manages label resources using copycat.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedLabelResourceStore
+ extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
+ implements LabelResourceStore {
+ private final Logger log = getLogger(getClass());
+
+ private static final String POOL_MAP_NAME = "labelresourcepool";
+
+ private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
+
+ private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private ExecutorService messageHandlingExecutor;
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+ private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
+
+ private static final Serializer SERIALIZER = Serializer
+ .using(new KryoNamespace.Builder().register(KryoNamespaces.API)
+ .register(LabelResourceEvent.class)
+ .register(LabelResourcePool.class).register(DeviceId.class)
+ .register(LabelResourceRequest.class)
+ .register(LabelResourceRequest.Type.class)
+ .register(LabelResourceEvent.Type.class)
+ .register(DefaultLabelResource.class)
+ .register(LabelResourceId.class)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
+
+ @Activate
+ public void activate() {
+
+ resourcePool = storageService
+ .<DeviceId, LabelResourcePool>consistentMapBuilder()
+ .withName(POOL_MAP_NAME).withSerializer(SERIALIZER)
+ .withPartitionsDisabled().build();
+ messageHandlingExecutor = Executors
+ .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/flow",
+ "message-handlers"));
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LabelResourcePool operation = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ operation);
+ boolean b = internalCreate(operation);
+ message.respond(SERIALIZER.encode(b));
+ }
+ }, messageHandlingExecutor);
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ DeviceId deviceId = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ deviceId);
+ boolean b = internalDestroy(deviceId);
+ message.respond(SERIALIZER.encode(b));
+ }
+ }, messageHandlingExecutor);
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LabelResourceRequest request = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ request);
+ final Collection<LabelResource> resource = internalApply(request);
+ message.respond(SERIALIZER
+ .encode(resource));
+ }
+ }, messageHandlingExecutor);
+ clusterCommunicator
+ .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+ new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LabelResourceRequest request = SERIALIZER
+ .decode(message.payload());
+ log.trace("received get flow entry request for {}",
+ request);
+ final boolean isSuccess = internalRelease(request);
+ message.respond(SERIALIZER
+ .encode(isSuccess));
+ }
+ }, messageHandlingExecutor);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
+ clusterCommunicator
+ .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
+ messageHandlingExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public boolean createDevicePool(DeviceId deviceId,
+ LabelResourceId beginLabel,
+ LabelResourceId endLabel) {
+ LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
+ beginLabel.labelId(),
+ endLabel.labelId());
+ return this.create(pool);
+ }
+
+ @Override
+ public boolean createGlobalPool(LabelResourceId beginLabel,
+ LabelResourceId endLabel) {
+ LabelResourcePool pool = new LabelResourcePool(
+ GLOBAL_RESOURCE_POOL_DEVICE_ID,
+ beginLabel.labelId(),
+ endLabel.labelId());
+ return this.internalCreate(pool);
+ }
+
+ private boolean create(LabelResourcePool pool) {
+ Device device = (Device) deviceService.getDevice(pool.deviceId());
+ if (device == null) {
+ return false;
+ }
+
+ NodeId master = mastershipService.getMasterFor(pool.deviceId());
+
+ if (master == null) {
+ log.warn("Failed to create label resource pool: No master for {}", pool);
+ return false;
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalCreate(pool);
+ }
+
+ log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+ master, pool.deviceId());
+
+ return complete(clusterCommunicator
+ .sendAndReceive(pool,
+ LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+
+ private boolean internalCreate(LabelResourcePool pool) {
+ Versioned<LabelResourcePool> poolOld = resourcePool
+ .get(pool.deviceId());
+ if (poolOld == null) {
+ resourcePool.put(pool.deviceId(), pool);
+ LabelResourceEvent event = new LabelResourceEvent(
+ Type.POOL_CREATED,
+ pool);
+ notifyDelegate(event);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean destroyDevicePool(DeviceId deviceId) {
+ Device device = (Device) deviceService.getDevice(deviceId);
+ if (device == null) {
+ return false;
+ }
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
+ return false;
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalDestroy(deviceId);
+ }
+
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
+
+ return complete(clusterCommunicator
+ .sendAndReceive(deviceId,
+ LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+
+ private boolean internalDestroy(DeviceId deviceId) {
+ Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
+ if (poolOld != null) {
+ resourcePool.remove(deviceId);
+ LabelResourceEvent event = new LabelResourceEvent(
+ Type.POOL_CREATED,
+ poolOld.value());
+ notifyDelegate(event);
+ }
+ log.info("success to destroy the label resource pool of device id {}",
+ deviceId);
+ return true;
+ }
+
+ @Override
+ public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
+ long applyNum) {
+ Device device = (Device) deviceService.getDevice(deviceId);
+ if (device == null) {
+ return Collections.emptyList();
+ }
+ LabelResourceRequest request = new LabelResourceRequest(
+ deviceId,
+ LabelResourceRequest.Type.APPLY,
+ applyNum, null);
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
+ return Collections.emptyList();
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalApply(request);
+ }
+
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
+
+ return complete(clusterCommunicator
+ .sendAndReceive(request,
+ LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+
+ private Collection<LabelResource> internalApply(LabelResourceRequest request) {
+ DeviceId deviceId = request.deviceId();
+ long applyNum = request.applyNum();
+ Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
+ LabelResourcePool pool = poolOld.value();
+ Collection<LabelResource> result = new HashSet<LabelResource>();
+ long freeNum = this.getFreeNumOfDevicePool(deviceId);
+ if (applyNum > freeNum) {
+ log.info("the free number of the label resource pool of deviceId {} is not enough.");
+ return Collections.emptyList();
+ }
+ Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
+ pool.releaseLabelId());
+ long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
+ .size();
+ LabelResource resource = null;
+ for (int i = 0; i < tmp; i++) {
+ Iterator<LabelResource> it = releaseLabels.iterator();
+ if (it.hasNext()) {
+ resource = it.next();
+ releaseLabels.remove(resource);
+ }
+ result.add(resource);
+ }
+ for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
+ .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
+ resource = new DefaultLabelResource(deviceId,
+ LabelResourceId
+ .labelResourceId(j));
+ result.add(resource);
+ }
+ long beginLabel = pool.beginLabel().labelId();
+ long endLabel = pool.endLabel().labelId();
+ long totalNum = pool.totalNum();
+ long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
+ long usedNum = pool.usedNum() + applyNum;
+ ImmutableSet<LabelResource> freeLabel = ImmutableSet
+ .copyOf(releaseLabels);
+ LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+ beginLabel, endLabel,
+ totalNum, usedNum,
+ current, freeLabel);
+ resourcePool.put(deviceId, newPool);
+ log.info("success to apply label resource");
+ return result;
+ }
+
+ @Override
+ public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
+ Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
+ Set<DeviceId> deviceIdSet = maps.keySet();
+ LabelResourceRequest request = null;
+ for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
+ DeviceId deviceId = (DeviceId) it.next();
+ Device device = (Device) deviceService.getDevice(deviceId);
+ if (device == null) {
+ continue;
+ }
+ ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
+ .get(deviceId));
+ request = new LabelResourceRequest(
+ deviceId,
+ LabelResourceRequest.Type.RELEASE,
+ 0, collection);
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
+ return false;
+ }
+
+ if (master.equals(clusterService.getLocalNode().id())) {
+ return internalRelease(request);
+ }
+
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
+
+ return complete(clusterCommunicator
+ .sendAndReceive(request,
+ LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+ SERIALIZER::encode, SERIALIZER::decode,
+ master));
+ }
+ return false;
+ }
+
+ private boolean internalRelease(LabelResourceRequest request) {
+ DeviceId deviceId = request.deviceId();
+ Collection<LabelResource> release = request.releaseCollection();
+ Versioned<LabelResourcePool> poolOld = resourcePool.get(deviceId);
+ LabelResourcePool pool = poolOld.value();
+ if (pool == null) {
+ log.info("the label resource pool of device id {} does not exist");
+ return false;
+ }
+ Set<LabelResource> storeSet = new HashSet<LabelResource>(
+ pool.releaseLabelId());
+ LabelResource labelResource = null;
+ long realReleasedNum = 0;
+ for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
+ labelResource = it.next();
+ if (labelResource.labelResourceId().labelId() < pool.beginLabel()
+ .labelId()
+ || labelResource.labelResourceId().labelId() > pool
+ .endLabel().labelId()) {
+ continue;
+ }
+ if (pool.currentUsedMaxLabelId().labelId() > labelResource
+ .labelResourceId().labelId()
+ || !storeSet.contains(labelResource)) {
+ storeSet.add(labelResource);
+ realReleasedNum++;
+ }
+ }
+ long beginNum = pool.beginLabel().labelId();
+ long endNum = pool.endLabel().labelId();
+ long totalNum = pool.totalNum();
+ long usedNum = pool.usedNum() - realReleasedNum;
+ long current = pool.currentUsedMaxLabelId().labelId();
+ ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
+ LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+ beginNum, endNum,
+ totalNum, usedNum,
+ current, s);
+ resourcePool.put(deviceId, newPool);
+ log.info("success to release label resource");
+ return true;
+ }
+
+ @Override
+ public boolean isDevicePoolFull(DeviceId deviceId) {
+ Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
+ if (pool == null) {
+ return true;
+ }
+ return pool.value().currentUsedMaxLabelId() == pool.value().endLabel()
+ && pool.value().releaseLabelId().size() == 0 ? true : false;
+ }
+
+ @Override
+ public long getFreeNumOfDevicePool(DeviceId deviceId) {
+ Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
+ if (pool == null) {
+ return 0;
+ }
+ return pool.value().endLabel().labelId()
+ - pool.value().currentUsedMaxLabelId().labelId()
+ + pool.value().releaseLabelId().size();
+ }
+
+ @Override
+ public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
+ Versioned<LabelResourcePool> pool = resourcePool.get(deviceId);
+ return pool == null ? null : pool.value();
+ }
+
+ @Override
+ public boolean destroyGlobalPool() {
+ return this.internalDestroy(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ @Override
+ public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
+ LabelResourceRequest request = new LabelResourceRequest(
+ DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+ LabelResourceRequest.Type.APPLY,
+ applyNum, null);
+ return this.internalApply(request);
+ }
+
+ @Override
+ public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
+ Set<LabelResource> set = new HashSet<LabelResource>();
+ DefaultLabelResource resource = null;
+ for (LabelResourceId labelResource : release) {
+ resource = new DefaultLabelResource(
+ DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+ labelResource);
+ set.add(resource);
+ }
+ LabelResourceRequest request = new LabelResourceRequest(
+ DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+ LabelResourceRequest.Type.APPLY,
+ 0,
+ ImmutableSet
+ .copyOf(set));
+ return this.internalRelease(request);
+ }
+
+ @Override
+ public boolean isGlobalPoolFull() {
+ return this.isDevicePoolFull(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ @Override
+ public long getFreeNumOfGlobalPool() {
+ return this.getFreeNumOfDevicePool(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ @Override
+ public LabelResourcePool getGlobalLabelResourcePool() {
+ return this.getDeviceLabelResourcePool(DeviceId
+ .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+ }
+
+ private <T> T complete(Future<T> future) {
+ try {
+ return future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted while waiting for operation to complete.", e);
+ return null;
+ } catch (TimeoutException | ExecutionException e) {
+ log.error("Failed remote operation", e);
+ return null;
+ }
+ }
+}
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java
new file mode 100644
index 00000000..0a6f1640
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/LabelResourceMessageSubjects.java
@@ -0,0 +1,17 @@
+package org.onosproject.incubator.store.resource.impl;
+
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+public final class LabelResourceMessageSubjects {
+
+ private LabelResourceMessageSubjects() {
+ }
+ public static final MessageSubject LABEL_POOL_CREATED
+ = new MessageSubject("label-resource-pool-created");
+ public static final MessageSubject LABEL_POOL_DESTROYED
+ = new MessageSubject("label-resource-pool-destroyed");
+ public static final MessageSubject LABEL_POOL_APPLY
+ = new MessageSubject("label-resource-pool-apply");
+ public static final MessageSubject LABEL_POOL_RELEASE
+ = new MessageSubject("label-resource-pool-release");
+}
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java
new file mode 100644
index 00000000..da0dc2e5
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the label resource distributed store.
+ */
+package org.onosproject.incubator.store.resource.impl; \ No newline at end of file
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
new file mode 100644
index 00000000..78c6468e
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
@@ -0,0 +1,532 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.tunnel.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.incubator.net.tunnel.DefaultTunnel;
+import org.onosproject.incubator.net.tunnel.Tunnel;
+import org.onosproject.incubator.net.tunnel.Tunnel.Type;
+import org.onosproject.incubator.net.tunnel.TunnelEndPoint;
+import org.onosproject.incubator.net.tunnel.TunnelEvent;
+import org.onosproject.incubator.net.tunnel.TunnelId;
+import org.onosproject.incubator.net.tunnel.TunnelName;
+import org.onosproject.incubator.net.tunnel.TunnelStore;
+import org.onosproject.incubator.net.tunnel.TunnelStoreDelegate;
+import org.onosproject.incubator.net.tunnel.TunnelSubscription;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.app.GossipApplicationStore.InternalState;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Manages inventory of tunnel in distributed data store that uses optimistic
+ * replication and gossip based techniques.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedTunnelStore
+ extends AbstractStore<TunnelEvent, TunnelStoreDelegate>
+ implements TunnelStore {
+
+ private final Logger log = getLogger(getClass());
+
+ /**
+ * The topic used for obtaining globally unique ids.
+ */
+ private String runnelOpTopoic = "tunnel-ops-ids";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ // tunnel identity as map key in the store.
+ private EventuallyConsistentMap<TunnelId, Tunnel> tunnelIdAsKeyStore;
+ // tunnel name as map key in the store.
+ private EventuallyConsistentMap<TunnelName, Set<TunnelId>> tunnelNameAsKeyStore;
+ // maintains all the tunnels between source and destination.
+ private EventuallyConsistentMap<TunnelKey, Set<TunnelId>> srcAndDstKeyStore;
+ // maintains all the tunnels by tunnel type.
+ private EventuallyConsistentMap<Tunnel.Type, Set<TunnelId>> typeKeyStore;
+ // maintains records that app subscribes tunnel.
+ private EventuallyConsistentMap<ApplicationId, Set<TunnelSubscription>> orderRelationship;
+
+ private IdGenerator idGenerator;
+
+ @Activate
+ public void activate() {
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MultiValuedTimestamp.class)
+ .register(InternalState.class);
+ tunnelIdAsKeyStore = storageService
+ .<TunnelId, Tunnel>eventuallyConsistentMapBuilder()
+ .withName("all_tunnel").withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+ tunnelNameAsKeyStore = storageService
+ .<TunnelName, Set<TunnelId>>eventuallyConsistentMapBuilder()
+ .withName("tunnel_name_tunnel").withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+ srcAndDstKeyStore = storageService
+ .<TunnelKey, Set<TunnelId>>eventuallyConsistentMapBuilder()
+ .withName("src_dst_tunnel").withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+ typeKeyStore = storageService
+ .<Tunnel.Type, Set<TunnelId>>eventuallyConsistentMapBuilder()
+ .withName("type_tunnel").withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+ orderRelationship = storageService
+ .<ApplicationId, Set<TunnelSubscription>>eventuallyConsistentMapBuilder()
+ .withName("type_tunnel").withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+ idGenerator = coreService.getIdGenerator(runnelOpTopoic);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ orderRelationship.destroy();
+ tunnelIdAsKeyStore.destroy();
+ srcAndDstKeyStore.destroy();
+ typeKeyStore.destroy();
+ tunnelNameAsKeyStore.destroy();
+ log.info("Stopped");
+ }
+
+ @Override
+ public TunnelId createOrUpdateTunnel(Tunnel tunnel) {
+ // tunnelIdAsKeyStore.
+ if (tunnel.tunnelId() != null && !"".equals(tunnel.tunnelId())) {
+ Tunnel old = tunnelIdAsKeyStore.get(tunnel.tunnelId());
+ if (old == null) {
+ log.info("This tunnel[" + tunnel.tunnelId() + "] is not available.");
+ return tunnel.tunnelId();
+ }
+ DefaultAnnotations oldAnno = (DefaultAnnotations) old.annotations();
+ SparseAnnotations newAnno = (SparseAnnotations) tunnel.annotations();
+ Tunnel newT = new DefaultTunnel(old.providerId(), old.src(),
+ old.dst(), old.type(),
+ old.state(), old.groupId(),
+ old.tunnelId(),
+ old.tunnelName(),
+ old.path(),
+ DefaultAnnotations.merge(oldAnno, newAnno));
+ tunnelIdAsKeyStore.put(tunnel.tunnelId(), newT);
+ TunnelEvent event = new TunnelEvent(
+ TunnelEvent.Type.TUNNEL_UPDATED,
+ tunnel);
+ notifyDelegate(event);
+ return tunnel.tunnelId();
+ } else {
+ TunnelId tunnelId = TunnelId.valueOf(idGenerator.getNewId());
+ Tunnel newT = new DefaultTunnel(tunnel.providerId(), tunnel.src(),
+ tunnel.dst(), tunnel.type(),
+ tunnel.state(), tunnel.groupId(),
+ tunnelId,
+ tunnel.tunnelName(),
+ tunnel.path(),
+ tunnel.annotations());
+ TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst());
+ tunnelIdAsKeyStore.put(tunnelId, newT);
+ Set<TunnelId> tunnelnameSet = tunnelNameAsKeyStore.get(tunnel
+ .tunnelName());
+ if (tunnelnameSet == null) {
+ tunnelnameSet = new HashSet<TunnelId>();
+ }
+ tunnelnameSet.add(tunnelId);
+ tunnelNameAsKeyStore.put(tunnel
+ .tunnelName(), tunnelnameSet);
+ Set<TunnelId> srcAndDstKeySet = srcAndDstKeyStore.get(key);
+ if (srcAndDstKeySet == null) {
+ srcAndDstKeySet = new HashSet<TunnelId>();
+ }
+ srcAndDstKeySet.add(tunnelId);
+ srcAndDstKeyStore.put(key, srcAndDstKeySet);
+ Set<TunnelId> typeKeySet = typeKeyStore.get(tunnel.type());
+ if (typeKeySet == null) {
+ typeKeySet = new HashSet<TunnelId>();
+ }
+ typeKeySet.add(tunnelId);
+ typeKeyStore.put(tunnel.type(), typeKeySet);
+ TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_ADDED,
+ tunnel);
+ notifyDelegate(event);
+ return tunnelId;
+ }
+ }
+
+ @Override
+ public void deleteTunnel(TunnelId tunnelId) {
+ Tunnel deletedTunnel = tunnelIdAsKeyStore.get(tunnelId);
+ if (deletedTunnel == null) {
+ return;
+ }
+ tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()).remove(tunnelId);
+ tunnelIdAsKeyStore.remove(tunnelId);
+ TunnelKey key = new TunnelKey(deletedTunnel.src(), deletedTunnel.dst());
+ srcAndDstKeyStore.get(key).remove(tunnelId);
+ typeKeyStore.get(deletedTunnel.type()).remove(tunnelId);
+ TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
+ deletedTunnel);
+ notifyDelegate(event);
+ }
+
+ @Override
+ public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst,
+ ProviderId producerName) {
+ TunnelKey key = TunnelKey.tunnelKey(src, dst);
+ Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+ if (idSet == null) {
+ return;
+ }
+ Tunnel deletedTunnel = null;
+ TunnelEvent event = null;
+ List<TunnelEvent> ls = new ArrayList<TunnelEvent>();
+ for (TunnelId id : idSet) {
+ deletedTunnel = tunnelIdAsKeyStore.get(id);
+ event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
+ deletedTunnel);
+ ls.add(event);
+ if (producerName.equals(deletedTunnel.providerId())) {
+ tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId());
+ tunnelNameAsKeyStore.get(deletedTunnel.tunnelName())
+ .remove(deletedTunnel.tunnelId());
+ srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId());
+ typeKeyStore.get(deletedTunnel.type())
+ .remove(deletedTunnel.tunnelId());
+ }
+ }
+ notifyDelegate(ls);
+ }
+
+ @Override
+ public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, Type type,
+ ProviderId producerName) {
+ TunnelKey key = TunnelKey.tunnelKey(src, dst);
+ Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+ if (idSet == null) {
+ return;
+ }
+ Tunnel deletedTunnel = null;
+ TunnelEvent event = null;
+ List<TunnelEvent> ls = new ArrayList<TunnelEvent>();
+ for (TunnelId id : idSet) {
+ deletedTunnel = tunnelIdAsKeyStore.get(id);
+ event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
+ deletedTunnel);
+ ls.add(event);
+ if (producerName.equals(deletedTunnel.providerId())
+ && type.equals(deletedTunnel.type())) {
+ tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId());
+ tunnelNameAsKeyStore.get(deletedTunnel.tunnelName())
+ .remove(deletedTunnel.tunnelId());
+ srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId());
+ typeKeyStore.get(deletedTunnel.type())
+ .remove(deletedTunnel.tunnelId());
+ }
+ }
+ notifyDelegate(ls);
+ }
+
+ @Override
+ public Tunnel borrowTunnel(ApplicationId appId, TunnelId tunnelId,
+ Annotations... annotations) {
+ Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
+ if (orderSet == null) {
+ orderSet = new HashSet<TunnelSubscription>();
+ }
+ TunnelSubscription order = new TunnelSubscription(appId, null, null, tunnelId, null, null,
+ annotations);
+ Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
+ if (result != null || Tunnel.State.INACTIVE.equals(result.state())) {
+ return null;
+ }
+ orderSet.add(order);
+ orderRelationship.put(appId, orderSet);
+ return result;
+ }
+
+ @Override
+ public Collection<Tunnel> borrowTunnel(ApplicationId appId,
+ TunnelEndPoint src,
+ TunnelEndPoint dst,
+ Annotations... annotations) {
+ Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
+ if (orderSet == null) {
+ orderSet = new HashSet<TunnelSubscription>();
+ }
+ TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, null, null, annotations);
+ boolean isExist = orderSet.contains(order);
+ if (!isExist) {
+ orderSet.add(order);
+ }
+ orderRelationship.put(appId, orderSet);
+ TunnelKey key = TunnelKey.tunnelKey(src, dst);
+ Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+ if (idSet == null || idSet.size() == 0) {
+ return Collections.emptySet();
+ }
+ Collection<Tunnel> tunnelSet = new HashSet<Tunnel>();
+ for (TunnelId tunnelId : idSet) {
+ Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
+ if (Tunnel.State.ACTIVE.equals(result.state())) {
+ tunnelSet.add(result);
+ }
+ }
+ return tunnelSet;
+ }
+
+ @Override
+ public Collection<Tunnel> borrowTunnel(ApplicationId appId,
+ TunnelEndPoint src,
+ TunnelEndPoint dst, Type type,
+ Annotations... annotations) {
+ Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
+ if (orderSet == null) {
+ orderSet = new HashSet<TunnelSubscription>();
+ }
+ TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, type, null, annotations);
+ boolean isExist = orderSet.contains(order);
+ if (!isExist) {
+ orderSet.add(order);
+ }
+ orderRelationship.put(appId, orderSet);
+ TunnelKey key = TunnelKey.tunnelKey(src, dst);
+ Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+ if (idSet == null || idSet.size() == 0) {
+ return Collections.emptySet();
+ }
+ Collection<Tunnel> tunnelSet = new HashSet<Tunnel>();
+ for (TunnelId tunnelId : idSet) {
+ Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
+ if (type.equals(result.type())
+ && Tunnel.State.ACTIVE.equals(result.state())) {
+ tunnelSet.add(result);
+ }
+ }
+ return tunnelSet;
+ }
+
+ @Override
+ public Collection<Tunnel> borrowTunnel(ApplicationId appId,
+ TunnelName tunnelName,
+ Annotations... annotations) {
+ Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
+ if (orderSet == null) {
+ orderSet = new HashSet<TunnelSubscription>();
+ }
+ TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName,
+ annotations);
+ boolean isExist = orderSet.contains(order);
+ if (!isExist) {
+ orderSet.add(order);
+ }
+ orderRelationship.put(appId, orderSet);
+ Set<TunnelId> idSet = tunnelNameAsKeyStore.get(tunnelName);
+ if (idSet == null || idSet.size() == 0) {
+ return Collections.emptySet();
+ }
+ Collection<Tunnel> tunnelSet = new HashSet<Tunnel>();
+ for (TunnelId tunnelId : idSet) {
+ Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
+ if (Tunnel.State.ACTIVE.equals(result.state())) {
+ tunnelSet.add(result);
+ }
+ }
+ return tunnelSet;
+ }
+
+ @Override
+ public boolean returnTunnel(ApplicationId appId, TunnelName tunnelName,
+ Annotations... annotations) {
+ TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName,
+ annotations);
+ return deleteOrder(order);
+ }
+
+ @Override
+ public boolean returnTunnel(ApplicationId appId, TunnelId tunnelId,
+ Annotations... annotations) {
+ TunnelSubscription order = new TunnelSubscription(appId, null, null, tunnelId, null, null,
+ annotations);
+ return deleteOrder(order);
+ }
+
+ @Override
+ public boolean returnTunnel(ApplicationId appId, TunnelEndPoint src,
+ TunnelEndPoint dst, Type type,
+ Annotations... annotations) {
+ TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, type, null, annotations);
+ return deleteOrder(order);
+ }
+
+ @Override
+ public boolean returnTunnel(ApplicationId appId, TunnelEndPoint src,
+ TunnelEndPoint dst, Annotations... annotations) {
+ TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, null, null, annotations);
+ return deleteOrder(order);
+ }
+
+ private boolean deleteOrder(TunnelSubscription order) {
+ Set<TunnelSubscription> orderSet = orderRelationship.get(order.consumerId());
+ if (orderSet == null) {
+ return true;
+ }
+ if (orderSet.contains(order)) {
+ orderSet.remove(order);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Tunnel queryTunnel(TunnelId tunnelId) {
+ return tunnelIdAsKeyStore.get(tunnelId);
+ }
+
+ @Override
+ public Collection<TunnelSubscription> queryTunnelSubscription(ApplicationId appId) {
+ return orderRelationship.get(appId) != null ? ImmutableSet.copyOf(orderRelationship
+ .get(appId)) : Collections.emptySet();
+ }
+
+ @Override
+ public Collection<Tunnel> queryTunnel(Type type) {
+ Collection<Tunnel> result = new HashSet<Tunnel>();
+ Set<TunnelId> tunnelIds = typeKeyStore.get(type);
+ if (tunnelIds == null) {
+ return Collections.emptySet();
+ }
+ for (TunnelId id : tunnelIds) {
+ result.add(tunnelIdAsKeyStore.get(id));
+ }
+ return result.size() == 0 ? Collections.emptySet() : ImmutableSet
+ .copyOf(result);
+ }
+
+ @Override
+ public Collection<Tunnel> queryTunnel(TunnelEndPoint src, TunnelEndPoint dst) {
+ Collection<Tunnel> result = new HashSet<Tunnel>();
+ TunnelKey key = TunnelKey.tunnelKey(src, dst);
+ Set<TunnelId> tunnelIds = srcAndDstKeyStore.get(key);
+ if (tunnelIds == null) {
+ return Collections.emptySet();
+ }
+ for (TunnelId id : tunnelIds) {
+ result.add(tunnelIdAsKeyStore.get(id));
+ }
+ return result.size() == 0 ? Collections.emptySet() : ImmutableSet
+ .copyOf(result);
+ }
+
+ @Override
+ public Collection<Tunnel> queryAllTunnels() {
+ return tunnelIdAsKeyStore.values();
+ }
+
+ @Override
+ public int tunnelCount() {
+ return tunnelIdAsKeyStore.size();
+ }
+
+ /**
+ * Uses source TunnelPoint and destination TunnelPoint as map key.
+ */
+ private static final class TunnelKey {
+ private final TunnelEndPoint src;
+ private final TunnelEndPoint dst;
+
+ private TunnelKey(TunnelEndPoint src, TunnelEndPoint dst) {
+ this.src = src;
+ this.dst = dst;
+
+ }
+
+ /**
+ * create a map key.
+ *
+ * @param src
+ * @param dst
+ * @return a key using source ip and destination ip
+ */
+ static TunnelKey tunnelKey(TunnelEndPoint src, TunnelEndPoint dst) {
+ return new TunnelKey(src, dst);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(src, dst);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof TunnelKey) {
+ final TunnelKey other = (TunnelKey) obj;
+ return Objects.equals(this.src, other.src)
+ && Objects.equals(this.dst, other.dst);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass()).add("src", src)
+ .add("dst", dst).toString();
+ }
+ }
+}
diff --git a/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java
new file mode 100644
index 00000000..f0c06f74
--- /dev/null
+++ b/framework/src/onos/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of distributed tunnel store using p2p synchronization protocol.
+ */
+package org.onosproject.incubator.store.tunnel.impl;