summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-10-09 18:32:44 -0700
committerAshlee Young <ashlee@onosfw.com>2015-10-09 18:32:44 -0700
commit6a07d2d622eaa06953f3353e39c080984076e8de (patch)
treebfb50a2090fce186c2cc545a400c969bf2ea702b /framework/src/onos/core/store/dist
parente6d71622143ff9b2421a1abbe8434b954b5b1099 (diff)
Updated master to commit id 6ee8aa3e67ce89908a8c93aa9445c6f71a18f986
Change-Id: I94b055ee2f298daf71e2ec794fd0f2495bd8081f
Diffstat (limited to 'framework/src/onos/core/store/dist')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java4
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java5
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java55
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java18
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java21
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java10
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java24
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java30
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java315
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java89
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java20
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java64
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java62
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java51
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java2
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java4
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java78
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java111
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java252
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java289
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java26
-rw-r--r--framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java95
22 files changed, 1289 insertions, 336 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
index ff3e36ac..52a999a4 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -38,7 +38,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa
* options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
*
* Additionally, the database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
@@ -54,7 +54,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa
* Creates a new database.<p>
*
* The database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index fbc2c88d..6ea7c220 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -443,7 +443,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
public void event(ApplicationEvent event) {
if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
ApplicationId appId = event.subject().id();
- List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
+ List<DefaultAsyncConsistentMap> mapsToRemove;
+ synchronized (mapsByApplication) {
+ mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
+ }
mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
if (event.type() == APP_UNINSTALLED) {
mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 95f9e39a..1d81f998 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -16,14 +16,14 @@
package org.onosproject.store.consistent.impl;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
/**
* Database proxy.
*/
@@ -45,6 +45,7 @@ public interface DatabaseProxy<K, V> {
/**
* Returns the number of entries in map.
+ *
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
@@ -62,7 +63,7 @@ public interface DatabaseProxy<K, V> {
* Checks whether the map contains a key.
*
* @param mapName map name
- * @param key key to check.
+ * @param key key to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
@@ -71,7 +72,7 @@ public interface DatabaseProxy<K, V> {
* Checks whether the map contains a value.
*
* @param mapName map name
- * @param value The value to check.
+ * @param value The value to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
@@ -80,7 +81,7 @@ public interface DatabaseProxy<K, V> {
* Gets a value from the map.
*
* @param mapName map name
- * @param key The key to get.
+ * @param key The key to get.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
@@ -88,11 +89,11 @@ public interface DatabaseProxy<K, V> {
/**
* Updates the map.
*
- * @param mapName map name
- * @param key The key to set
- * @param valueMatch match for checking existing value
- * @param versionMatch match for checking existing version
- * @param value new value
+ * @param mapName map name
+ * @param key The key to set
+ * @param valueMatch match for checking existing value
+ * @param versionMatch match for checking existing version
+ * @param value new value
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
@@ -130,11 +131,11 @@ public interface DatabaseProxy<K, V> {
*/
CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
- /**
+ /**
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
- * @param delta value to add
+ * @param delta value to add
* @return updated value
*/
CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
@@ -143,11 +144,31 @@ public interface DatabaseProxy<K, V> {
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
- * @param delta value to add
+ * @param delta value to add
* @return previous value
*/
CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
+
+ /**
+ * Atomically sets the given value to current value of the specified counter.
+ *
+ * @param counterName counter name
+ * @param value value to set
+ * @return void future
+ */
+ CompletableFuture<Void> counterSet(String counterName, long value);
+
+ /**
+ * Atomically sets the given counter to the specified update value if and only if the current value is equal to the
+ * expected value.
+ * @param counterName counter name
+ * @param expectedValue value to use for equivalence check
+ * @param update value to set if expected value is current value
+ * @return true if an update occurred, false otherwise
+ */
+ CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
+
/**
* Returns the current value of the specified atomic counter.
*
@@ -158,6 +179,7 @@ public interface DatabaseProxy<K, V> {
/**
* Returns the size of queue.
+ *
* @param queueName queue name
* @return queue size
*/
@@ -165,14 +187,16 @@ public interface DatabaseProxy<K, V> {
/**
* Inserts an entry into the queue.
+ *
* @param queueName queue name
- * @param entry queue entry
+ * @param entry queue entry
* @return void future
*/
CompletableFuture<Void> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
+ *
* @param queueName queue name
* @return entry future. Can be completed with null if queue is empty
*/
@@ -180,6 +204,7 @@ public interface DatabaseProxy<K, V> {
/**
* Returns but does not remove an entry from the queue.
+ *
* @param queueName queue name
* @return entry. Can be null if queue is empty
*/
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index b3dd1c44..1136428b 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -16,18 +16,17 @@
package org.onosproject.store.consistent.impl;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* Database state.
@@ -83,6 +82,9 @@ public interface DatabaseState<K, V> {
Long counterAddAndGet(String counterName, long delta);
@Command
+ Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
+
+ @Command
Long counterGetAndAdd(String counterName, long delta);
@Query
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index 7a439c34..d851eaa0 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import java.util.concurrent.CompletableFuture;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -38,6 +39,8 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
private static final String GET_AND_ADD = "getAndAdd";
private static final String ADD_AND_GET = "addAndGet";
private static final String GET = "get";
+ private static final String SET = "set";
+ private static final String COMPARE_AND_SET = "compareAndSet";
public DefaultAsyncAtomicCounter(String name,
Database database,
@@ -72,13 +75,27 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
public CompletableFuture<Long> getAndAdd(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
return database.counterGetAndAdd(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
return database.counterAddAndGet(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Void> set(long value) {
+ final MeteringAgent.Context timer = monitor.startTimer(SET);
+ return database.counterSet(name, value)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
+ final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
+ return database.counterCompareAndSet(name, expectedValue, updateValue)
+ .whenComplete((r, e) -> timer.stop(e));
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 64886e41..2d6a956c 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -63,6 +63,16 @@ public class DefaultAtomicCounter implements AtomicCounter {
}
@Override
+ public void set(long value) {
+ complete(asyncCounter.set(value));
+ }
+
+ @Override
+ public boolean compareAndSet(long expectedValue, long updateValue) {
+ return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
+ }
+
+ @Override
public long get() {
return complete(asyncCounter.get());
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 4d9776ee..2a50fbd6 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -16,12 +16,15 @@
package org.onosproject.store.consistent.impl;
-import net.kuujo.copycat.state.StateMachine;
+import com.google.common.collect.Sets;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;
+import net.kuujo.copycat.state.StateMachine;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.function.TriConsumer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
@@ -30,11 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Sets;
-
/**
* Default database.
*/
@@ -44,7 +42,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
public DefaultDatabase(ResourceManager context) {
super(context);
this.stateMachine = new DefaultStateMachine(context,
@@ -66,7 +64,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
* return the completed future result.
*
* @param supplier The supplier to call if the database is open.
- * @param <T> The future result type.
+ * @param <T> The future result type.
* @return A completable future that if this database is closed is immediately failed.
*/
protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
@@ -153,6 +151,16 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ return checkOpen(() -> proxy.counterSet(counterName, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
+ return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
+ }
+
+ @Override
public CompletableFuture<Long> queueSize(String queueName) {
return checkOpen(() -> proxy.queueSize(queueName));
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 9a55ffb1..8943fc87 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -16,27 +16,26 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import java.util.Set;
-
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import net.kuujo.copycat.state.Initializer;
-import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
@@ -195,6 +194,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
@Override
+ public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+ return getCounter(counterName).compareAndSet(expectedValue, updateValue);
+ }
+
+ @Override
public Long counterGet(String counterName) {
return getCounter(counterName).get();
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java
new file mode 100644
index 00000000..d8593e37
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode.State;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MutexExecutionService;
+import org.onosproject.store.service.MutexTask;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of a MutexExecutionService.
+ */
+@Component(immediate = true)
+@Service
+public class MutexExecutionManager implements MutexExecutionService {
+
+ private final Logger log = getLogger(getClass());
+
+ protected ConsistentMap<String, MutexState> lockMap;
+ protected NodeId localNodeId;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
+ private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
+
+ private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
+ private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ lockMap = storageService.<String, MutexState>consistentMapBuilder()
+ .withName("onos-mutexes")
+ .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
+ .withPartitionsDisabled()
+ .build();
+ lockMap.addListener(mapEventListener);
+ clusterService.addListener(clusterEventListener);
+ releaseOldLocks();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ lockMap.removeListener(mapEventListener);
+ pending.values().forEach(future -> future.cancel(true));
+ activeTasks.forEach((k, v) -> {
+ v.stop();
+ unlock(k);
+ });
+ clusterService.removeListener(clusterEventListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
+ return lock(exclusionPath)
+ .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
+ k -> new InnerMutexTask(exclusionPath,
+ task,
+ state.term())))
+ .thenAcceptAsync(t -> t.start(), executor)
+ .whenComplete((r, e) -> unlock(exclusionPath));
+ }
+
+ protected CompletableFuture<MutexState> lock(String exclusionPath) {
+ CompletableFuture<MutexState> future =
+ pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
+ tryLock(exclusionPath);
+ return future;
+ }
+
+ /**
+ * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
+ * the wait list.
+ * @param exclusionPath exclusion path
+ */
+ protected void tryLock(String exclusionPath) {
+ Tools.retryable(() -> lockMap.asJavaMap()
+ .compute(exclusionPath,
+ (k, v) -> MutexState.admit(v, localNodeId)),
+ ConsistentMapException.ConcurrentModification.class,
+ Integer.MAX_VALUE,
+ 100).get();
+ }
+
+ /**
+ * Releases lock for the specific path. This operation is idempotent.
+ * @param exclusionPath exclusion path
+ */
+ protected void unlock(String exclusionPath) {
+ Tools.retryable(() -> lockMap.asJavaMap()
+ .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
+ ConsistentMapException.ConcurrentModification.class,
+ Integer.MAX_VALUE,
+ 100).get();
+ }
+
+ /**
+ * Detects and releases all locks held by this node.
+ */
+ private void releaseOldLocks() {
+ Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
+ .keySet()
+ .forEach(path -> {
+ log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
+ unlock(path);
+ });
+ }
+
+ private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
+
+ @Override
+ public void event(MapEvent<String, MutexState> event) {
+ log.debug("Received {}", event);
+ if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
+ pending.computeIfPresent(event.key(), (k, future) -> {
+ MutexState state = Versioned.valueOrElse(event.value(), null);
+ if (state != null && localNodeId.equals(state.holder())) {
+ log.debug("Local node is now owner for {}", event.key());
+ future.complete(state);
+ return null;
+ } else {
+ return future;
+ }
+ });
+ InnerMutexTask task = activeTasks.get(event.key());
+ if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
+ task.stop();
+ }
+ }
+ }
+ }
+
+ private class InternalClusterEventListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
+ event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
+ NodeId nodeId = event.subject().id();
+ log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
+ lockMap.asJavaMap().forEach((k, v) -> {
+ if (v.contains(nodeId)) {
+ lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
+ }
+ });
+ }
+ long activeNodes = clusterService.getNodes()
+ .stream()
+ .map(node -> clusterService.getState(node.id()))
+ .filter(State.ACTIVE::equals)
+ .count();
+ if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
+ log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
+ activeTasks.forEach((k, v) -> {
+ v.stop();
+ });
+ }
+ }
+ }
+
+ private static final class MutexState {
+
+ private final NodeId holder;
+ private final List<NodeId> waitList;
+ private final long term;
+
+ public static MutexState admit(MutexState state, NodeId nodeId) {
+ if (state == null) {
+ return new MutexState(nodeId, 1L, Lists.newArrayList());
+ } else if (state.holder() == null) {
+ return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
+ } else {
+ if (!state.contains(nodeId)) {
+ NodeId newHolder = state.holder();
+ List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
+ newWaitList.add(nodeId);
+ return new MutexState(newHolder, state.term(), newWaitList);
+ } else {
+ return state;
+ }
+ }
+ }
+
+ public static MutexState evict(MutexState state, NodeId nodeId) {
+ return state.evict(nodeId);
+ }
+
+ public MutexState evict(NodeId nodeId) {
+ if (nodeId.equals(holder)) {
+ if (waitList.isEmpty()) {
+ return new MutexState(null, term, waitList);
+ }
+ List<NodeId> newWaitList = Lists.newArrayList(waitList);
+ NodeId newHolder = newWaitList.remove(0);
+ return new MutexState(newHolder, term + 1, newWaitList);
+ } else {
+ NodeId newHolder = holder;
+ List<NodeId> newWaitList = Lists.newArrayList(waitList);
+ newWaitList.remove(nodeId);
+ return new MutexState(newHolder, term, newWaitList);
+ }
+ }
+
+ public NodeId holder() {
+ return holder;
+ }
+
+ public List<NodeId> waitList() {
+ return waitList;
+ }
+
+ public long term() {
+ return term;
+ }
+
+ private boolean contains(NodeId nodeId) {
+ return (nodeId.equals(holder) || waitList.contains(nodeId));
+ }
+
+ private MutexState(NodeId holder, long term, List<NodeId> waitList) {
+ this.holder = holder;
+ this.term = term;
+ this.waitList = Lists.newArrayList(waitList);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("holder", holder)
+ .add("term", term)
+ .add("waitList", waitList)
+ .toString();
+ }
+ }
+
+ private class InnerMutexTask implements MutexTask {
+ private final MutexTask task;
+ private final String mutexPath;
+ private final long term;
+
+ public InnerMutexTask(String mutexPath, MutexTask task, long term) {
+ this.mutexPath = mutexPath;
+ this.term = term;
+ this.task = task;
+ }
+
+ public long term() {
+ return term;
+ }
+
+ @Override
+ public void start() {
+ log.debug("Starting execution for mutex task guarded by {}", mutexPath);
+ task.start();
+ log.debug("Finished execution for mutex task guarded by {}", mutexPath);
+ }
+
+ @Override
+ public void stop() {
+ log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
+ task.stop();
+ }
+ }
+} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index a294681e..f741b367 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -16,6 +16,17 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
+import net.kuujo.copycat.resource.ResourceState;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -28,18 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import net.kuujo.copycat.Task;
-import net.kuujo.copycat.cluster.Cluster;
-import net.kuujo.copycat.resource.ResourceState;
import static com.google.common.base.Preconditions.checkState;
/**
@@ -100,10 +99,10 @@ public class PartitionedDatabase implements Database {
return CompletableFuture.allOf(partitions
.stream()
.map(db -> db.counters()
- .thenApply(m -> {
- counters.putAll(m);
- return null;
- }))
+ .thenApply(m -> {
+ counters.putAll(m);
+ return null;
+ }))
.toArray(CompletableFuture[]::new))
.thenApply(v -> counters);
}
@@ -113,9 +112,9 @@ public class PartitionedDatabase implements Database {
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> totalSize.get());
}
@@ -136,10 +135,10 @@ public class PartitionedDatabase implements Database {
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapContainsValue(mapName, value)
- .thenApply(v -> containsValue.compareAndSet(false, v)))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapContainsValue(mapName, value)
+ .thenApply(v -> containsValue.compareAndSet(false, v)))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> containsValue.get());
}
@@ -196,9 +195,9 @@ public class PartitionedDatabase implements Database {
checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> entrySet);
}
@@ -220,6 +219,19 @@ public class PartitionedDatabase implements Database {
return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
}
+ @Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).
+ counterCompareAndSet(counterName, expectedValue, updateValue);
+
+ }
@Override
public CompletableFuture<Long> queueSize(String queueName) {
@@ -268,8 +280,8 @@ public class PartitionedDatabase implements Database {
AtomicBoolean status = new AtomicBoolean(true);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
- .map(entry -> entry
- .getKey()
+ .map(entry -> entry
+ .getKey()
.prepare(entry.getValue())
.thenApply(v -> status.compareAndSet(true, v)))
.toArray(CompletableFuture[]::new))
@@ -282,15 +294,15 @@ public class PartitionedDatabase implements Database {
AtomicBoolean success = new AtomicBoolean(true);
List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().commit(entry.getValue())
- .thenAccept(response -> {
- success.set(success.get() && response.success());
- if (success.get()) {
- allUpdates.addAll(response.updates());
- }
- }))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue())
+ .thenAccept(response -> {
+ success.set(success.get() && response.success());
+ if (success.get()) {
+ allUpdates.addAll(response.updates());
+ }
+ }))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> success.get() ?
CommitResponse.success(allUpdates) : CommitResponse.failure());
}
@@ -301,7 +313,7 @@ public class PartitionedDatabase implements Database {
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().rollback(entry.getValue()))
- .toArray(CompletableFuture[]::new))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> true);
}
@@ -384,3 +396,4 @@ public class PartitionedDatabase implements Database {
partitions.forEach(p -> p.unregisterConsumer(consumer));
}
}
+
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2859b62f..f1e0dbd4 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
@@ -33,18 +32,15 @@ import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -67,6 +63,8 @@ import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
/**
* Distributed Map implementation which uses optimistic replication and gossip
@@ -359,7 +357,7 @@ public class EventuallyConsistentMapImpl<K, V>
valueMatches = Objects.equals(value.get(), existing.get());
}
if (existing == null) {
- log.debug("ECMap Remove: Existing value for key {} is already null", k);
+ log.trace("ECMap Remove: Existing value for key {} is already null", k);
}
if (valueMatches) {
if (existing == null) {
@@ -523,7 +521,7 @@ public class EventuallyConsistentMapImpl<K, V>
return;
}
peers.forEach(node ->
- senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+ senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
);
}
@@ -576,8 +574,10 @@ public class EventuallyConsistentMapImpl<K, V>
return;
}
try {
- log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
- mapName, ad.sender(), ad.digest().size());
+ if (log.isTraceEnabled()) {
+ log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
+ mapName, ad.sender(), ad.digest().size());
+ }
antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
if (!lightweightAntiEntropy) {
@@ -675,4 +675,4 @@ public class EventuallyConsistentMapImpl<K, V>
});
}
}
-} \ No newline at end of file
+}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index de7a3ac3..8cd63e7d 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.flow.impl;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -57,6 +58,7 @@ import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -64,9 +66,16 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -151,6 +160,13 @@ public class NewDistributedFlowRuleStore
private final ScheduledExecutorService backupSenderExecutor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
+ private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
+ private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
+ new InternalTableStatsListener();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -161,6 +177,11 @@ public class NewDistributedFlowRuleStore
}
};
+ protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MastershipBasedTimestamp.class);
+
+
private IdGenerator idGenerator;
private NodeId local;
@@ -186,6 +207,15 @@ public class NewDistributedFlowRuleStore
TimeUnit.MILLISECONDS);
}
+ deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
+ .withName("onos-flow-table-stats")
+ .withSerializer(SERIALIZER_BUILDER)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+ deviceTableStats.addListener(tableStatsListener);
+
logConfig("Started");
}
@@ -197,6 +227,8 @@ public class NewDistributedFlowRuleStore
}
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
+ deviceTableStats.removeListener(tableStatsListener);
+ deviceTableStats.destroy();
messageHandlingExecutor.shutdownNow();
backupSenderExecutor.shutdownNow();
log.info("Stopped");
@@ -786,4 +818,36 @@ public class NewDistributedFlowRuleStore
return backedupDevices;
}
}
+
+ @Override
+ public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
+ List<TableStatisticsEntry> tableStats) {
+ deviceTableStats.put(deviceId, tableStats);
+ return null;
+ }
+
+ @Override
+ public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.debug("Failed to getTableStats: No master for {}", deviceId);
+ return Collections.emptyList();
+ }
+
+ List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
+ if (tableStats == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(tableStats);
+ }
+
+ private class InternalTableStatsListener
+ implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceId,
+ List<TableStatisticsEntry>> event) {
+ //TODO: Generate an event to listeners (do we need?)
+ }
+ }
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 97333ebf..a999ee7f 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -28,19 +28,11 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.flow.instructions.L0ModificationInstruction;
-import org.onosproject.net.flow.instructions.L2ModificationInstruction;
-import org.onosproject.net.flow.instructions.L3ModificationInstruction;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
@@ -61,9 +53,7 @@ import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.MultiValuedTimestamp;
-import org.onosproject.store.serializers.DeviceIdSerializer;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.URISerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
@@ -71,7 +61,6 @@ import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -139,9 +128,12 @@ public class DistributedGroupStore
private final AtomicLong sequenceNumber = new AtomicLong(0);
+ private KryoNamespace clusterMsgSerializer;
+
@Activate
public void activate() {
kryoBuilder = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
.register(DefaultGroup.class,
DefaultGroupBucket.class,
DefaultGroupDescription.class,
@@ -158,38 +150,9 @@ public class DistributedGroupStore
GroupStoreKeyMapKey.class,
GroupStoreIdMapKey.class,
GroupStoreMapKey.class
- )
- .register(new URISerializer(), URI.class)
- .register(new DeviceIdSerializer(), DeviceId.class)
- .register(PortNumber.class)
- .register(DefaultApplicationId.class)
- .register(DefaultTrafficTreatment.class,
- Instructions.DropInstruction.class,
- Instructions.OutputInstruction.class,
- Instructions.GroupInstruction.class,
- Instructions.TableTypeTransition.class,
- FlowRule.Type.class,
- L0ModificationInstruction.class,
- L0ModificationInstruction.L0SubType.class,
- L0ModificationInstruction.ModLambdaInstruction.class,
- L2ModificationInstruction.class,
- L2ModificationInstruction.L2SubType.class,
- L2ModificationInstruction.ModEtherInstruction.class,
- L2ModificationInstruction.PushHeaderInstructions.class,
- L2ModificationInstruction.ModVlanIdInstruction.class,
- L2ModificationInstruction.ModVlanPcpInstruction.class,
- L2ModificationInstruction.ModMplsLabelInstruction.class,
- L2ModificationInstruction.ModMplsTtlInstruction.class,
- L3ModificationInstruction.class,
- L3ModificationInstruction.L3SubType.class,
- L3ModificationInstruction.ModIPInstruction.class,
- L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
- L3ModificationInstruction.ModTtlInstruction.class,
- org.onlab.packet.MplsLabel.class
- )
- .register(org.onosproject.cluster.NodeId.class)
- .register(KryoNamespaces.BASIC)
- .register(KryoNamespaces.MISC);
+ );
+
+ clusterMsgSerializer = kryoBuilder.build();
messageHandlingExecutor = Executors.
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
@@ -197,7 +160,7 @@ public class DistributedGroupStore
"message-handlers"));
clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build()::deserialize,
+ clusterMsgSerializer::deserialize,
this::process,
messageHandlingExecutor);
@@ -233,6 +196,7 @@ public class DistributedGroupStore
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
groupStoreEntriesByKey.destroy();
auditPendingReqQueue.destroy();
log.info("Stopped");
@@ -313,8 +277,6 @@ public class DistributedGroupStore
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
- log.debug("getGroups: for device {} total number of groups {}",
- deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId))
.transform(input -> input);
@@ -322,8 +284,6 @@ public class DistributedGroupStore
private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
- log.debug("getGroups: for device {} total number of groups {}",
- deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId));
}
@@ -411,7 +371,7 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
+ clusterMsgSerializer::serialize,
mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
@@ -609,7 +569,7 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
+ clusterMsgSerializer::serialize,
mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
@@ -741,7 +701,7 @@ public class DistributedGroupStore
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
+ clusterMsgSerializer::serialize,
mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
index d0b827cd..f9c96891 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
@@ -27,6 +27,7 @@ import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.RE
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,7 +68,6 @@ import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
@@ -197,6 +197,35 @@ public class ECHostStore
}
@Override
+ public HostEvent removeIp(HostId hostId, IpAddress ipAddress) {
+ DefaultHost host = hosts.compute(hostId, (id, existingHost) -> {
+ if (existingHost != null) {
+ checkState(Objects.equals(hostId.mac(), existingHost.mac()),
+ "Existing and new MAC addresses differ.");
+ checkState(Objects.equals(hostId.vlanId(), existingHost.vlan()),
+ "Existing and new VLANs differ.");
+
+ Set<IpAddress> addresses = existingHost.ipAddresses();
+ if (addresses != null && addresses.contains(ipAddress)) {
+ addresses = new HashSet<>(existingHost.ipAddresses());
+ addresses.remove(ipAddress);
+ return new DefaultHost(existingHost.providerId(),
+ hostId,
+ existingHost.mac(),
+ existingHost.vlan(),
+ existingHost.location(),
+ ImmutableSet.copyOf(addresses),
+ existingHost.annotations());
+ } else {
+ return existingHost;
+ }
+ }
+ return null;
+ });
+ return host != null ? new HostEvent(HOST_UPDATED, host) : null;
+ }
+
+ @Override
public int getHostCount() {
return hosts.size();
}
@@ -228,17 +257,23 @@ public class ECHostStore
@Override
public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
- return ImmutableSet.copyOf(locations.get(connectPoint));
+ synchronized (locations) {
+ return ImmutableSet.copyOf(locations.get(connectPoint));
+ }
}
@Override
public Set<Host> getConnectedHosts(DeviceId deviceId) {
- return ImmutableMultimap.copyOf(locations)
- .entries()
- .stream()
- .filter(entry -> entry.getKey().deviceId().equals(deviceId))
- .map(entry -> entry.getValue())
- .collect(Collectors.toSet());
+ Set<Host> filtered;
+ synchronized (locations) {
+ filtered = locations
+ .entries()
+ .stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+ .map(entry -> entry.getValue())
+ .collect(Collectors.toSet());
+ }
+ return ImmutableSet.copyOf(filtered);
}
private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) {
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index fa3a0751..1e5db99c 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -193,7 +193,7 @@ public class GossipIntentStore
private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
NodeId master = partitionService.getLeader(key);
NodeId origin = (data != null) ? data.origin() : null;
- if (master == null || origin == null) {
+ if (data != null && (master == null || origin == null)) {
log.debug("Intent {} missing master and/or origin; master = {}, origin = {}",
key, master, origin);
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 105c77df..47aa85c5 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -826,7 +826,7 @@ public class GossipLinkStore
public void handle(ClusterMessage message) {
log.trace("Received link event from peer: {}", message.sender());
- InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
+ InternalLinkEvent event = SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
@@ -845,7 +845,7 @@ public class GossipLinkStore
public void handle(ClusterMessage message) {
log.trace("Received link removed event from peer: {}", message.sender());
- InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
+ InternalLinkRemovedEvent event = SERIALIZER.decode(message.payload());
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index d4c89c93..f0f3eb5e 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,7 +15,9 @@
*/
package org.onosproject.store.packet.impl;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,14 +43,13 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -117,6 +118,7 @@ public class DistributedPacketStore
public void deactivate() {
communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
messageHandlingExecutor.shutdown();
+ tracker = null;
log.info("Stopped");
}
@@ -143,13 +145,13 @@ public class DistributedPacketStore
}
@Override
- public boolean requestPackets(PacketRequest request) {
- return tracker.add(request);
+ public void requestPackets(PacketRequest request) {
+ tracker.add(request);
}
@Override
- public boolean cancelPackets(PacketRequest request) {
- return tracker.remove(request);
+ public void cancelPackets(PacketRequest request) {
+ tracker.remove(request);
}
@Override
@@ -169,33 +171,50 @@ public class DistributedPacketStore
.build();
}
- public boolean add(PacketRequest request) {
- Versioned<Set<PacketRequest>> old = requests.get(request.selector());
- if (old != null && old.value().contains(request)) {
- return false;
+ public void add(PacketRequest request) {
+ AtomicBoolean firstRequest = new AtomicBoolean(false);
+ requests.compute(request.selector(), (s, existingRequests) -> {
+ if (existingRequests == null) {
+ firstRequest.set(true);
+ return ImmutableSet.of(request);
+ } else if (!existingRequests.contains(request)) {
+ return ImmutableSet.<PacketRequest>builder()
+ .addAll(existingRequests)
+ .add(request)
+ .build();
+ } else {
+ return existingRequests;
+ }
+ });
+
+ if (firstRequest.get() && delegate != null) {
+ // The instance that makes the first request will push to all devices
+ delegate.requestPackets(request);
}
- // FIXME: add retry logic using a random delay
- Set<PacketRequest> newSet = new HashSet<>();
- newSet.add(request);
- if (old == null) {
- return requests.putIfAbsent(request.selector(), newSet) == null;
- }
- newSet.addAll(old.value());
- return requests.replace(request.selector(), old.version(), newSet);
}
- public boolean remove(PacketRequest request) {
- Versioned<Set<PacketRequest>> old = requests.get(request.selector());
- if (old == null || !old.value().contains(request)) {
- return false;
- }
- // FIXME: add retry logic using a random delay
- Set<PacketRequest> newSet = new HashSet<>(old.value());
- newSet.remove(request);
- if (newSet.isEmpty()) {
- return requests.remove(request.selector(), old.version());
+ public void remove(PacketRequest request) {
+ AtomicBoolean removedLast = new AtomicBoolean(false);
+ requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+ if (existingRequests.contains(request)) {
+ Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+ newRequests.remove(request);
+ if (newRequests.size() > 0) {
+ return ImmutableSet.copyOf(newRequests);
+ } else {
+ removedLast.set(true);
+ return null;
+ }
+ } else {
+ return existingRequests;
+ }
+ });
+
+ if (removedLast.get() && delegate != null) {
+ // The instance that removes the last request will remove from all devices
+ delegate.cancelPackets(request);
}
- return requests.replace(request.selector(), old.version(), newSet);
+
}
public List<PacketRequest> requests() {
@@ -204,6 +223,5 @@ public class DistributedPacketStore
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
return list;
}
-
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java
new file mode 100644
index 00000000..87e67215
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.resource.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.intent.IntentId;
+import org.onosproject.net.resource.device.IntentSetMultimap;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A collection that maps Intent IDs as keys to values as Intent IDs,
+ * where each key may associated with multiple values without duplication.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class ConsistentIntentSetMultimap implements IntentSetMultimap {
+ private final Logger log = getLogger(getClass());
+
+ private static final String INTENT_MAPPING = "IntentMapping";
+
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
+
+ private ConsistentMap<IntentId, Set<IntentId>> intentMapping;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Activate
+ public void activate() {
+ intentMapping = storageService.<IntentId, Set<IntentId>>consistentMapBuilder()
+ .withName(INTENT_MAPPING)
+ .withSerializer(SERIALIZER)
+ .build();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<IntentId> getMapping(IntentId intentId) {
+ Versioned<Set<IntentId>> result = intentMapping.get(intentId);
+
+ if (result != null) {
+ return result.value();
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean allocateMapping(IntentId keyIntentId, IntentId valIntentId) {
+ Versioned<Set<IntentId>> versionedIntents = intentMapping.get(keyIntentId);
+
+ if (versionedIntents == null) {
+ Set<IntentId> newSet = new HashSet<>();
+ newSet.add(valIntentId);
+ intentMapping.put(keyIntentId, newSet);
+ } else {
+ versionedIntents.value().add(valIntentId);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void releaseMapping(IntentId intentId) {
+ for (IntentId intent : intentMapping.keySet()) {
+ // TODO: optimize by checking for identical src & dst
+ Set<IntentId> mapping = intentMapping.get(intent).value();
+ if (mapping.remove(intentId)) {
+ return;
+ }
+ }
+ }
+
+}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java
index 3a296353..11137aa2 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java
@@ -40,7 +40,6 @@ import org.onosproject.net.Link;
import org.onosproject.net.LinkKey;
import org.onosproject.net.Port;
import org.onosproject.net.intent.IntentId;
-import org.onosproject.net.link.LinkService;
import org.onosproject.net.resource.link.BandwidthResource;
import org.onosproject.net.resource.link.BandwidthResourceAllocation;
import org.onosproject.net.resource.link.LambdaResource;
@@ -69,7 +68,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.net.AnnotationKeys.BANDWIDTH;
@@ -108,9 +106,6 @@ public class ConsistentLinkResourceStore extends
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LinkService linkService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Activate
@@ -139,29 +134,30 @@ public class ConsistentLinkResourceStore extends
return storageService.transactionContextBuilder().build();
}
- private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) {
- if (type == ResourceType.BANDWIDTH) {
- return ImmutableSet.of(getBandwidthResourceCapacity(link));
- }
- if (type == ResourceType.LAMBDA) {
- return getLambdaResourceCapacity(link);
+ private Set<ResourceAllocation> getResourceCapacity(ResourceType type, Link link) {
+ switch (type) {
+ case BANDWIDTH:
+ return ImmutableSet.of(getBandwidthResourceCapacity(link));
+ case LAMBDA:
+ return getLambdaResourceCapacity(link);
+ case MPLS_LABEL:
+ return getMplsResourceCapacity();
+ default:
+ return ImmutableSet.of();
}
- if (type == ResourceType.MPLS_LABEL) {
- return getMplsResourceCapacity();
- }
- return ImmutableSet.of();
}
- private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) {
- Set<LambdaResourceAllocation> allocations = new HashSet<>();
+ private Set<ResourceAllocation> getLambdaResourceCapacity(Link link) {
Port port = deviceService.getPort(link.src().deviceId(), link.src().port());
- if (port instanceof OmsPort) {
- OmsPort omsPort = (OmsPort) port;
+ if (!(port instanceof OmsPort)) {
+ return Collections.emptySet();
+ }
- // Assume fixed grid for now
- for (int i = 0; i < omsPort.totalChannels(); i++) {
- allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i)));
- }
+ OmsPort omsPort = (OmsPort) port;
+ Set<ResourceAllocation> allocations = new HashSet<>();
+ // Assume fixed grid for now
+ for (int i = 0; i < omsPort.totalChannels(); i++) {
+ allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i)));
}
return allocations;
}
@@ -170,26 +166,23 @@ public class ConsistentLinkResourceStore extends
// if Link annotation exist, use them
// if all fails, use DEFAULT_BANDWIDTH
- BandwidthResource bandwidth = null;
+ BandwidthResource bandwidth = DEFAULT_BANDWIDTH;
String strBw = link.annotations().value(BANDWIDTH);
- if (strBw != null) {
- try {
- bandwidth = new BandwidthResource(Bandwidth.mbps(Double.parseDouble(strBw)));
- } catch (NumberFormatException e) {
- // do nothings
- bandwidth = null;
- }
+ if (strBw == null) {
+ return new BandwidthResourceAllocation(bandwidth);
}
- if (bandwidth == null) {
- // fall back, use fixed default
+ try {
+ bandwidth = new BandwidthResource(Bandwidth.mbps(Double.parseDouble(strBw)));
+ } catch (NumberFormatException e) {
+ // do nothings, use default bandwidth
bandwidth = DEFAULT_BANDWIDTH;
}
return new BandwidthResourceAllocation(bandwidth);
}
- private Set<MplsLabelResourceAllocation> getMplsResourceCapacity() {
- Set<MplsLabelResourceAllocation> allocations = new HashSet<>();
+ private Set<ResourceAllocation> getMplsResourceCapacity() {
+ Set<ResourceAllocation> allocations = new HashSet<>();
//Ignoring reserved labels of 0 through 15
for (int i = MIN_UNRESERVED_LABEL; i <= MAX_UNRESERVED_LABEL; i++) {
allocations.add(new MplsLabelResourceAllocation(MplsLabel
@@ -199,13 +192,11 @@ public class ConsistentLinkResourceStore extends
return allocations;
}
- private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) {
- Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>();
+ private Map<ResourceType, Set<ResourceAllocation>> getResourceCapacity(Link link) {
+ Map<ResourceType, Set<ResourceAllocation>> caps = new HashMap<>();
for (ResourceType type : ResourceType.values()) {
- Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link);
- if (cap != null) {
- caps.put(type, cap);
- }
+ Set<ResourceAllocation> cap = getResourceCapacity(type, link);
+ caps.put(type, cap);
}
return caps;
}
@@ -216,106 +207,80 @@ public class ConsistentLinkResourceStore extends
tx.begin();
try {
- Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(tx, link);
- Set<ResourceAllocation> allFree = new HashSet<>();
- freeResources.values().forEach(allFree::addAll);
- return allFree;
+ Map<ResourceType, Set<ResourceAllocation>> freeResources = getFreeResourcesEx(tx, link);
+ return freeResources.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
} finally {
tx.abort();
}
}
- private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(TransactionContext tx, Link link) {
+ private Map<ResourceType, Set<ResourceAllocation>> getFreeResourcesEx(TransactionContext tx, Link link) {
checkNotNull(tx);
checkNotNull(link);
- Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>();
- final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link);
- final Iterable<LinkResourceAllocations> allocations = getAllocations(tx, link);
+ Map<ResourceType, Set<ResourceAllocation>> free = new HashMap<>();
+ final Map<ResourceType, Set<ResourceAllocation>> caps = getResourceCapacity(link);
+ final List<LinkResourceAllocations> allocations = ImmutableList.copyOf(getAllocations(tx, link));
- for (ResourceType type : ResourceType.values()) {
- // there should be class/category of resources
+ Set<ResourceAllocation> bw = caps.get(ResourceType.BANDWIDTH);
+ Set<ResourceAllocation> value = getFreeBandwidthResources(link, bw, allocations);
+ free.put(ResourceType.BANDWIDTH, value);
- switch (type) {
- case BANDWIDTH:
- Set<? extends ResourceAllocation> bw = caps.get(type);
- if (bw == null || bw.isEmpty()) {
- bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW));
- }
+ Set<ResourceAllocation> lmd = caps.get(ResourceType.LAMBDA);
+ Set<ResourceAllocation> freeL = getFreeResources(link, lmd, allocations,
+ LambdaResourceAllocation.class);
+ free.put(ResourceType.LAMBDA, freeL);
- BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next();
- double freeBw = cap.bandwidth().toDouble();
-
- // enumerate current allocations, subtracting resources
- for (LinkResourceAllocations alloc : allocations) {
- Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
- for (ResourceAllocation a : types) {
- if (a instanceof BandwidthResourceAllocation) {
- BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a;
- freeBw -= bwA.bandwidth().toDouble();
- }
- }
- }
+ Set<ResourceAllocation> mpls = caps.get(ResourceType.MPLS_LABEL);
+ Set<ResourceAllocation> freeLabel = getFreeResources(link, mpls, allocations,
+ MplsLabelResourceAllocation.class);
+ free.put(ResourceType.MPLS_LABEL, freeLabel);
- free.put(type, Sets.newHashSet(
- new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(freeBw)))));
- break;
- case LAMBDA:
- Set<? extends ResourceAllocation> lmd = caps.get(type);
- if (lmd == null || lmd.isEmpty()) {
- // nothing left
- break;
- }
- Set<LambdaResourceAllocation> freeL = new HashSet<>();
- for (ResourceAllocation r : lmd) {
- if (r instanceof LambdaResourceAllocation) {
- freeL.add((LambdaResourceAllocation) r);
- }
- }
-
- // enumerate current allocations, removing resources
- for (LinkResourceAllocations alloc : allocations) {
- Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
- for (ResourceAllocation a : types) {
- if (a instanceof LambdaResourceAllocation) {
- freeL.remove(a);
- }
- }
- }
+ return free;
+ }
- free.put(type, freeL);
- break;
- case MPLS_LABEL:
- Set<? extends ResourceAllocation> mpls = caps.get(type);
- if (mpls == null || mpls.isEmpty()) {
- // nothing left
- break;
- }
- Set<MplsLabelResourceAllocation> freeLabel = new HashSet<>();
- for (ResourceAllocation r : mpls) {
- if (r instanceof MplsLabelResourceAllocation) {
- freeLabel.add((MplsLabelResourceAllocation) r);
- }
- }
+ private Set<ResourceAllocation> getFreeBandwidthResources(Link link, Set<ResourceAllocation> bw,
+ List<LinkResourceAllocations> allocations) {
+ if (bw == null || bw.isEmpty()) {
+ bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW));
+ }
- // enumerate current allocations, removing resources
- for (LinkResourceAllocations alloc : allocations) {
- Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
- for (ResourceAllocation a : types) {
- if (a instanceof MplsLabelResourceAllocation) {
- freeLabel.remove(a);
- }
- }
- }
+ BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next();
+ double freeBw = cap.bandwidth().toDouble();
+
+ // enumerate current allocations, subtracting resources
+ double allocatedBw = allocations.stream()
+ .flatMap(x -> x.getResourceAllocation(link).stream())
+ .filter(x -> x instanceof BandwidthResourceAllocation)
+ .map(x -> (BandwidthResourceAllocation) x)
+ .mapToDouble(x -> x.bandwidth().toDouble())
+ .sum();
+ freeBw -= allocatedBw;
+ return Sets.newHashSet(
+ new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(freeBw))));
+ }
- free.put(type, freeLabel);
- break;
- default:
- log.debug("unsupported ResourceType {}", type);
- break;
- }
+ private Set<ResourceAllocation> getFreeResources(Link link,
+ Set<ResourceAllocation> resources,
+ List<LinkResourceAllocations> allocations,
+ Class<? extends ResourceAllocation> cls) {
+ if (resources == null || resources.isEmpty()) {
+ // nothing left
+ return Collections.emptySet();
}
- return free;
+ Set<ResourceAllocation> freeL = resources.stream()
+ .filter(cls::isInstance)
+ .collect(Collectors.toSet());
+
+ // enumerate current allocations, removing resources
+ List<ResourceAllocation> allocated = allocations.stream()
+ .flatMap(x -> x.getResourceAllocation(link).stream())
+ .filter(cls::isInstance)
+ .collect(Collectors.toList());
+ freeL.removeAll(allocated);
+ return freeL;
}
@Override
@@ -329,6 +294,9 @@ public class ConsistentLinkResourceStore extends
intentAllocs.put(allocations.intentId(), allocations);
allocations.links().forEach(link -> allocateLinkResource(tx, link, allocations));
tx.commit();
+ } catch (TransactionException | ResourceAllocationException e) {
+ log.error("Exception thrown, rolling back", e);
+ tx.abort();
} catch (Exception e) {
log.error("Exception thrown, rolling back", e);
tx.abort();
@@ -340,15 +308,13 @@ public class ConsistentLinkResourceStore extends
LinkResourceAllocations allocations) {
// requested resources
Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link);
- Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(tx, link);
+ Map<ResourceType, Set<ResourceAllocation>> available = getFreeResourcesEx(tx, link);
for (ResourceAllocation req : reqs) {
- Set<? extends ResourceAllocation> avail = available.get(req.type());
+ Set<ResourceAllocation> avail = available.get(req.type());
if (req instanceof BandwidthResourceAllocation) {
// check if allocation should be accepted
if (avail.isEmpty()) {
- checkState(!avail.isEmpty(),
- "There's no Bandwidth resource on %s?",
- link);
+ throw new ResourceAllocationException(String.format("There's no Bandwidth resource on %s?", link));
}
BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next();
double bwLeft = bw.bandwidth().toDouble();
@@ -395,12 +361,7 @@ public class ConsistentLinkResourceStore extends
if (before == null) {
List<LinkResourceAllocations> after = new ArrayList<>();
after.add(allocations);
- before = linkAllocs.putIfAbsent(linkKey, after);
- if (before != null) {
- // concurrent allocation detected, retry transaction : is this needed?
- log.warn("Concurrent Allocation, retrying");
- throw new TransactionException();
- }
+ linkAllocs.putIfAbsent(linkKey, after);
} else {
List<LinkResourceAllocations> after = new ArrayList<>(before.size() + 1);
after.addAll(before);
@@ -500,19 +461,18 @@ public class ConsistentLinkResourceStore extends
checkNotNull(link);
final LinkKey key = LinkKey.linkKey(link);
TransactionalMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
- List<LinkResourceAllocations> res = null;
- res = linkAllocs.get(key);
- if (res == null) {
- res = linkAllocs.putIfAbsent(key, new ArrayList<>());
+ List<LinkResourceAllocations> res = linkAllocs.get(key);
+ if (res != null) {
+ return res;
+ }
- if (res == null) {
- return Collections.emptyList();
- } else {
- return res;
- }
+ res = linkAllocs.putIfAbsent(key, new ArrayList<>());
+ if (res == null) {
+ return Collections.emptyList();
+ } else {
+ return res;
}
- return res;
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
new file mode 100644
index 00000000..0cd4a831
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.statistic.impl;
+
+import com.google.common.base.Objects;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.statistic.FlowStatisticStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Maintains flow statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedFlowStatisticStore implements FlowStatisticStore {
+ private final Logger log = getLogger(getClass());
+
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private Map<ConnectPoint, Set<FlowEntry>> previous =
+ new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> current =
+ new ConcurrentHashMap<>();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ // register this store specific classes here
+ .build();
+ }
+ };
+
+ private NodeId local;
+ private ExecutorService messageHandlingExecutor;
+
+ private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
+
+ @Activate
+ public void activate() {
+ local = clusterService.getLocalNode().id();
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/statistic", "message-handlers"));
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(GET_PREVIOUS);
+ clusterCommunicator.removeSubscriber(GET_CURRENT);
+ messageHandlingExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public synchronized void removeFlowStatistic(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // remove this rule if present from current map
+ current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+
+ // remove this on if present from previous map
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ @Override
+ public synchronized void addFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // create one if absent and add this rule
+ current.putIfAbsent(cp, new HashSet<>());
+ current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
+
+ // remove previous one if present
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ public synchronized void updateFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ Set<FlowEntry> curr = current.get(cp);
+ if (curr == null) {
+ addFlowStatistic(rule);
+ } else {
+ Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
+ findAny();
+ if (f.isPresent() && rule.bytes() < f.get().bytes()) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " Invalid Flow Update! Will be removed!!" +
+ " curr flowId=" + Long.toHexString(rule.id().value()) +
+ ", prev flowId=" + Long.toHexString(f.get().id().value()) +
+ ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
+ ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
+ ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
+ // something is wrong! invalid flow entry, so delete it
+ removeFlowStatistic(rule);
+ return;
+ }
+ Set<FlowEntry> prev = previous.get(cp);
+ if (prev == null) {
+ prev = new HashSet<>();
+ previous.put(cp, prev);
+ }
+
+ // previous one is exist
+ if (f.isPresent()) {
+ // remove old one and add new one
+ prev.remove(rule);
+ if (!prev.add(f.get())) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into previous.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+
+ // remove old one and add new one
+ curr.remove(rule);
+ if (!curr.add(rule)) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into current.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+ }
+
+ @Override
+ public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getCurrentStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_CURRENT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+ return current.get(connectPoint);
+ }
+
+ @Override
+ public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getPreviousStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_PREVIOUS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+ return previous.get(connectPoint);
+ }
+
+ private ConnectPoint buildConnectPoint(FlowRule rule) {
+ PortNumber port = getOutput(rule);
+
+ if (port == null) {
+ return null;
+ }
+ ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+ return cp;
+ }
+
+ private PortNumber getOutput(FlowRule rule) {
+ for (Instruction i : rule.treatment().allInstructions()) {
+ if (i.type() == Instruction.Type.OUTPUT) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+ return out.port();
+ }
+ if (i.type() == Instruction.Type.DROP) {
+ return PortNumber.P0;
+ }
+ }
+ return null;
+ }
+} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java
index 487fad9b..da4e3cc4 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java
@@ -21,6 +21,7 @@ import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
+import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -40,6 +41,7 @@ import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.Path;
+import org.onosproject.net.DisjointPath;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.topology.ClusterId;
import org.onosproject.net.topology.DefaultGraphDescription;
@@ -74,7 +76,6 @@ public class DistributedTopologyStore
implements TopologyStore {
private final Logger log = getLogger(getClass());
-
private volatile DefaultTopology current =
new DefaultTopology(ProviderId.NONE,
new DefaultGraphDescription(0L, System.currentTimeMillis(),
@@ -167,6 +168,29 @@ public class DistributedTopologyStore
}
@Override
+ public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst) {
+ return defaultTopology(topology).getDisjointPaths(src, dst);
+ }
+
+ @Override
+ public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst,
+ LinkWeight weight) {
+ return defaultTopology(topology).getDisjointPaths(src, dst, weight);
+ }
+
+ @Override
+ public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst,
+ Map<Link, Object> riskProfile) {
+ return defaultTopology(topology).getDisjointPaths(src, dst, riskProfile);
+ }
+
+ @Override
+ public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst,
+ LinkWeight weight, Map<Link, Object> riskProfile) {
+ return defaultTopology(topology).getDisjointPaths(src, dst, weight, riskProfile);
+ }
+
+ @Override
public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
return defaultTopology(topology).isInfrastructure(connectPoint);
}
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
new file mode 100644
index 00000000..a7077a81
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.host.impl;
+
+import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.host.DefaultHostDescription;
+import org.onosproject.net.host.HostDescription;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Tests for the ECHostStore.
+ */
+public class ECHostStoreTest extends TestCase {
+
+ private ECHostStore ecXHostStore;
+
+ private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a"));
+
+ private static final IpAddress IP1 = IpAddress.valueOf("10.2.0.2");
+ private static final IpAddress IP2 = IpAddress.valueOf("10.2.0.3");
+
+ private static final ProviderId PID = new ProviderId("of", "foo");
+
+ @Before
+ public void setUp() {
+ ecXHostStore = new ECHostStore();
+
+ ecXHostStore.storageService = new TestStorageService();
+ ecXHostStore.clockService = new TestLogicalClockService();
+ ecXHostStore.activate();
+ }
+
+ @After
+ public void tearDown() {
+ ecXHostStore.deactivate();
+ }
+
+ /**
+ * Tests the removeIp method call.
+ */
+ @Test
+ public void testRemoveIp() {
+ Set<IpAddress> ips = new HashSet<>();
+ ips.add(IP1);
+ ips.add(IP2);
+
+ HostDescription description = new DefaultHostDescription(HOSTID.mac(),
+ HOSTID.vlanId(),
+ HostLocation.NONE,
+ ips);
+ ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false);
+ ecXHostStore.removeIp(HOSTID, IP1);
+ Host host = ecXHostStore.getHost(HOSTID);
+
+ assertFalse(host.ipAddresses().contains(IP1));
+ assertTrue(host.ipAddresses().contains(IP2));
+ }
+
+ /**
+ * Mocks the LogicalClockService class.
+ */
+ class TestLogicalClockService implements LogicalClockService {
+ @Override
+ public Timestamp getTimestamp() {
+ return null;
+ }
+ }
+} \ No newline at end of file