diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-10-09 18:32:44 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-10-09 18:32:44 -0700 |
commit | 6a07d2d622eaa06953f3353e39c080984076e8de (patch) | |
tree | bfb50a2090fce186c2cc545a400c969bf2ea702b /framework/src/onos/core/store | |
parent | e6d71622143ff9b2421a1abbe8434b954b5b1099 (diff) |
Updated master to commit id 6ee8aa3e67ce89908a8c93aa9445c6f71a18f986
Change-Id: I94b055ee2f298daf71e2ec794fd0f2495bd8081f
Diffstat (limited to 'framework/src/onos/core/store')
23 files changed, 1295 insertions, 342 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java index ff3e36ac..52a999a4 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java @@ -38,7 +38,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p> * * Additionally, the database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and + * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource * configurations will be loaded according to namespaces as well; for example, `databases.conf`. @@ -54,7 +54,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa * Creates a new database.<p> * * The database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and + * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource * configurations will be loaded according to namespaces as well; for example, `databases.conf`. diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java index fbc2c88d..6ea7c220 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java @@ -443,7 +443,10 @@ public class DatabaseManager implements StorageService, StorageAdminService { public void event(ApplicationEvent event) { if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) { ApplicationId appId = event.subject().id(); - List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId)); + List<DefaultAsyncConsistentMap> mapsToRemove; + synchronized (mapsByApplication) { + mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId)); + } mapsToRemove.forEach(DatabaseManager.this::unregisterMap); if (event.type() == APP_UNINSTALLED) { mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear()); diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java index 95f9e39a..1d81f998 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java @@ -16,14 +16,14 @@ package org.onosproject.store.consistent.impl; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - /** * Database proxy. */ @@ -45,6 +45,7 @@ public interface DatabaseProxy<K, V> { /** * Returns the number of entries in map. + * * @param mapName map name * @return A completable future to be completed with the result once complete. */ @@ -62,7 +63,7 @@ public interface DatabaseProxy<K, V> { * Checks whether the map contains a key. * * @param mapName map name - * @param key key to check. + * @param key key to check. * @return A completable future to be completed with the result once complete. */ CompletableFuture<Boolean> mapContainsKey(String mapName, K key); @@ -71,7 +72,7 @@ public interface DatabaseProxy<K, V> { * Checks whether the map contains a value. * * @param mapName map name - * @param value The value to check. + * @param value The value to check. * @return A completable future to be completed with the result once complete. */ CompletableFuture<Boolean> mapContainsValue(String mapName, V value); @@ -80,7 +81,7 @@ public interface DatabaseProxy<K, V> { * Gets a value from the map. * * @param mapName map name - * @param key The key to get. + * @param key The key to get. * @return A completable future to be completed with the result once complete. */ CompletableFuture<Versioned<V>> mapGet(String mapName, K key); @@ -88,11 +89,11 @@ public interface DatabaseProxy<K, V> { /** * Updates the map. * - * @param mapName map name - * @param key The key to set - * @param valueMatch match for checking existing value - * @param versionMatch match for checking existing version - * @param value new value + * @param mapName map name + * @param key The key to set + * @param valueMatch match for checking existing value + * @param versionMatch match for checking existing version + * @param value new value * @return A completable future to be completed with the result once complete */ CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate( @@ -130,11 +131,11 @@ public interface DatabaseProxy<K, V> { */ CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName); - /** + /** * Atomically add the given value to current value of the specified counter. * * @param counterName counter name - * @param delta value to add + * @param delta value to add * @return updated value */ CompletableFuture<Long> counterAddAndGet(String counterName, long delta); @@ -143,11 +144,31 @@ public interface DatabaseProxy<K, V> { * Atomically add the given value to current value of the specified counter. * * @param counterName counter name - * @param delta value to add + * @param delta value to add * @return previous value */ CompletableFuture<Long> counterGetAndAdd(String counterName, long delta); + + /** + * Atomically sets the given value to current value of the specified counter. + * + * @param counterName counter name + * @param value value to set + * @return void future + */ + CompletableFuture<Void> counterSet(String counterName, long value); + + /** + * Atomically sets the given counter to the specified update value if and only if the current value is equal to the + * expected value. + * @param counterName counter name + * @param expectedValue value to use for equivalence check + * @param update value to set if expected value is current value + * @return true if an update occurred, false otherwise + */ + CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update); + /** * Returns the current value of the specified atomic counter. * @@ -158,6 +179,7 @@ public interface DatabaseProxy<K, V> { /** * Returns the size of queue. + * * @param queueName queue name * @return queue size */ @@ -165,14 +187,16 @@ public interface DatabaseProxy<K, V> { /** * Inserts an entry into the queue. + * * @param queueName queue name - * @param entry queue entry + * @param entry queue entry * @return void future */ CompletableFuture<Void> queuePush(String queueName, byte[] entry); /** * Removes an entry from the queue if the queue is non-empty. + * * @param queueName queue name * @return entry future. Can be completed with null if queue is empty */ @@ -180,6 +204,7 @@ public interface DatabaseProxy<K, V> { /** * Returns but does not remove an entry from the queue. + * * @param queueName queue name * @return entry. Can be null if queue is empty */ diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java index b3dd1c44..1136428b 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java @@ -16,18 +16,17 @@ package org.onosproject.store.consistent.impl; -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - import net.kuujo.copycat.state.Command; import net.kuujo.copycat.state.Initializer; import net.kuujo.copycat.state.Query; import net.kuujo.copycat.state.StateContext; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * Database state. @@ -83,6 +82,9 @@ public interface DatabaseState<K, V> { Long counterAddAndGet(String counterName, long delta); @Command + Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue); + + @Command Long counterGetAndAdd(String counterName, long delta); @Query diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java index 7a439c34..d851eaa0 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java @@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl; import org.onosproject.store.service.AsyncAtomicCounter; import java.util.concurrent.CompletableFuture; + import static com.google.common.base.Preconditions.checkNotNull; /** @@ -38,6 +39,8 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { private static final String GET_AND_ADD = "getAndAdd"; private static final String ADD_AND_GET = "addAndGet"; private static final String GET = "get"; + private static final String SET = "set"; + private static final String COMPARE_AND_SET = "compareAndSet"; public DefaultAsyncAtomicCounter(String name, Database database, @@ -72,13 +75,27 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { public CompletableFuture<Long> getAndAdd(long delta) { final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD); return database.counterGetAndAdd(name, delta) - .whenComplete((r, e) -> timer.stop(e)); + .whenComplete((r, e) -> timer.stop(e)); } @Override public CompletableFuture<Long> addAndGet(long delta) { final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET); return database.counterAddAndGet(name, delta) - .whenComplete((r, e) -> timer.stop(e)); + .whenComplete((r, e) -> timer.stop(e)); + } + + @Override + public CompletableFuture<Void> set(long value) { + final MeteringAgent.Context timer = monitor.startTimer(SET); + return database.counterSet(name, value) + .whenComplete((r, e) -> timer.stop(e)); + } + + @Override + public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) { + final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET); + return database.counterCompareAndSet(name, expectedValue, updateValue) + .whenComplete((r, e) -> timer.stop(e)); } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java index 64886e41..2d6a956c 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java @@ -63,6 +63,16 @@ public class DefaultAtomicCounter implements AtomicCounter { } @Override + public void set(long value) { + complete(asyncCounter.set(value)); + } + + @Override + public boolean compareAndSet(long expectedValue, long updateValue) { + return complete(asyncCounter.compareAndSet(expectedValue, updateValue)); + } + + @Override public long get() { return complete(asyncCounter.get()); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java index 4d9776ee..2a50fbd6 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java @@ -16,12 +16,15 @@ package org.onosproject.store.consistent.impl; -import net.kuujo.copycat.state.StateMachine; +import com.google.common.collect.Sets; import net.kuujo.copycat.resource.internal.AbstractResource; import net.kuujo.copycat.resource.internal.ResourceManager; +import net.kuujo.copycat.state.StateMachine; import net.kuujo.copycat.state.internal.DefaultStateMachine; import net.kuujo.copycat.util.concurrent.Futures; import net.kuujo.copycat.util.function.TriConsumer; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; import java.util.Collection; import java.util.Map; @@ -30,11 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import com.google.common.collect.Sets; - /** * Default database. */ @@ -44,7 +42,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet(); private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher(); - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) public DefaultDatabase(ResourceManager context) { super(context); this.stateMachine = new DefaultStateMachine(context, @@ -66,7 +64,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab * return the completed future result. * * @param supplier The supplier to call if the database is open. - * @param <T> The future result type. + * @param <T> The future result type. * @return A completable future that if this database is closed is immediately failed. */ protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) { @@ -153,6 +151,16 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab } @Override + public CompletableFuture<Void> counterSet(String counterName, long value) { + return checkOpen(() -> proxy.counterSet(counterName, value)); + } + + @Override + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) { + return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update)); + } + + @Override public CompletableFuture<Long> queueSize(String queueName) { return checkOpen(() -> proxy.queueSize(queueName)); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java index 9a55ffb1..8943fc87 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java @@ -16,27 +16,26 @@ package org.onosproject.store.consistent.impl; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import net.kuujo.copycat.state.Initializer; +import net.kuujo.copycat.state.StateContext; +import org.onosproject.store.service.DatabaseUpdate; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.Set; - -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import net.kuujo.copycat.state.Initializer; -import net.kuujo.copycat.state.StateContext; /** * Default database state. @@ -195,6 +194,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { } @Override + public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) { + return getCounter(counterName).compareAndSet(expectedValue, updateValue); + } + + @Override public Long counterGet(String counterName) { return getCounter(counterName).get(); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java new file mode 100644 index 00000000..d8593e37 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java @@ -0,0 +1,315 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.Tools; +import org.onosproject.cluster.ClusterEvent; +import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode.State; +import org.onosproject.cluster.NodeId; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.ConsistentMapException; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.MutexExecutionService; +import org.onosproject.store.service.MutexTask; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Implementation of a MutexExecutionService. + */ +@Component(immediate = true) +@Service +public class MutexExecutionManager implements MutexExecutionService { + + private final Logger log = getLogger(getClass()); + + protected ConsistentMap<String, MutexState> lockMap; + protected NodeId localNodeId; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener(); + private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); + + private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap(); + private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap(); + + @Activate + public void activate() { + localNodeId = clusterService.getLocalNode().id(); + lockMap = storageService.<String, MutexState>consistentMapBuilder() + .withName("onos-mutexes") + .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class)) + .withPartitionsDisabled() + .build(); + lockMap.addListener(mapEventListener); + clusterService.addListener(clusterEventListener); + releaseOldLocks(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + lockMap.removeListener(mapEventListener); + pending.values().forEach(future -> future.cancel(true)); + activeTasks.forEach((k, v) -> { + v.stop(); + unlock(k); + }); + clusterService.removeListener(clusterEventListener); + log.info("Stopped"); + } + + @Override + public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) { + return lock(exclusionPath) + .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath, + k -> new InnerMutexTask(exclusionPath, + task, + state.term()))) + .thenAcceptAsync(t -> t.start(), executor) + .whenComplete((r, e) -> unlock(exclusionPath)); + } + + protected CompletableFuture<MutexState> lock(String exclusionPath) { + CompletableFuture<MutexState> future = + pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>()); + tryLock(exclusionPath); + return future; + } + + /** + * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to + * the wait list. + * @param exclusionPath exclusion path + */ + protected void tryLock(String exclusionPath) { + Tools.retryable(() -> lockMap.asJavaMap() + .compute(exclusionPath, + (k, v) -> MutexState.admit(v, localNodeId)), + ConsistentMapException.ConcurrentModification.class, + Integer.MAX_VALUE, + 100).get(); + } + + /** + * Releases lock for the specific path. This operation is idempotent. + * @param exclusionPath exclusion path + */ + protected void unlock(String exclusionPath) { + Tools.retryable(() -> lockMap.asJavaMap() + .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)), + ConsistentMapException.ConcurrentModification.class, + Integer.MAX_VALUE, + 100).get(); + } + + /** + * Detects and releases all locks held by this node. + */ + private void releaseOldLocks() { + Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder())) + .keySet() + .forEach(path -> { + log.info("Detected zombie task still holding lock for {}. Releasing lock.", path); + unlock(path); + }); + } + + private class InternalLockMapEventListener implements MapEventListener<String, MutexState> { + + @Override + public void event(MapEvent<String, MutexState> event) { + log.debug("Received {}", event); + if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) { + pending.computeIfPresent(event.key(), (k, future) -> { + MutexState state = Versioned.valueOrElse(event.value(), null); + if (state != null && localNodeId.equals(state.holder())) { + log.debug("Local node is now owner for {}", event.key()); + future.complete(state); + return null; + } else { + return future; + } + }); + InnerMutexTask task = activeTasks.get(event.key()); + if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) { + task.stop(); + } + } + } + } + + private class InternalClusterEventListener implements ClusterEventListener { + + @Override + public void event(ClusterEvent event) { + if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || + event.type() == ClusterEvent.Type.INSTANCE_REMOVED) { + NodeId nodeId = event.subject().id(); + log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId); + lockMap.asJavaMap().forEach((k, v) -> { + if (v.contains(nodeId)) { + lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId)); + } + }); + } + long activeNodes = clusterService.getNodes() + .stream() + .map(node -> clusterService.getState(node.id())) + .filter(State.ACTIVE::equals) + .count(); + if (clusterService.getNodes().size() > 1 && activeNodes == 1) { + log.info("This node is partitioned away from the cluster. Stopping all inflight executions"); + activeTasks.forEach((k, v) -> { + v.stop(); + }); + } + } + } + + private static final class MutexState { + + private final NodeId holder; + private final List<NodeId> waitList; + private final long term; + + public static MutexState admit(MutexState state, NodeId nodeId) { + if (state == null) { + return new MutexState(nodeId, 1L, Lists.newArrayList()); + } else if (state.holder() == null) { + return new MutexState(nodeId, state.term() + 1, Lists.newArrayList()); + } else { + if (!state.contains(nodeId)) { + NodeId newHolder = state.holder(); + List<NodeId> newWaitList = Lists.newArrayList(state.waitList()); + newWaitList.add(nodeId); + return new MutexState(newHolder, state.term(), newWaitList); + } else { + return state; + } + } + } + + public static MutexState evict(MutexState state, NodeId nodeId) { + return state.evict(nodeId); + } + + public MutexState evict(NodeId nodeId) { + if (nodeId.equals(holder)) { + if (waitList.isEmpty()) { + return new MutexState(null, term, waitList); + } + List<NodeId> newWaitList = Lists.newArrayList(waitList); + NodeId newHolder = newWaitList.remove(0); + return new MutexState(newHolder, term + 1, newWaitList); + } else { + NodeId newHolder = holder; + List<NodeId> newWaitList = Lists.newArrayList(waitList); + newWaitList.remove(nodeId); + return new MutexState(newHolder, term, newWaitList); + } + } + + public NodeId holder() { + return holder; + } + + public List<NodeId> waitList() { + return waitList; + } + + public long term() { + return term; + } + + private boolean contains(NodeId nodeId) { + return (nodeId.equals(holder) || waitList.contains(nodeId)); + } + + private MutexState(NodeId holder, long term, List<NodeId> waitList) { + this.holder = holder; + this.term = term; + this.waitList = Lists.newArrayList(waitList); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("holder", holder) + .add("term", term) + .add("waitList", waitList) + .toString(); + } + } + + private class InnerMutexTask implements MutexTask { + private final MutexTask task; + private final String mutexPath; + private final long term; + + public InnerMutexTask(String mutexPath, MutexTask task, long term) { + this.mutexPath = mutexPath; + this.term = term; + this.task = task; + } + + public long term() { + return term; + } + + @Override + public void start() { + log.debug("Starting execution for mutex task guarded by {}", mutexPath); + task.start(); + log.debug("Finished execution for mutex task guarded by {}", mutexPath); + } + + @Override + public void stop() { + log.debug("Stopping execution for mutex task guarded by {}", mutexPath); + task.stop(); + } + } +}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java index a294681e..f741b367 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java @@ -16,6 +16,17 @@ package org.onosproject.store.consistent.impl; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import net.kuujo.copycat.Task; +import net.kuujo.copycat.cluster.Cluster; +import net.kuujo.copycat.resource.ResourceState; +import org.onosproject.store.service.DatabaseUpdate; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + import java.util.Collection; import java.util.List; import java.util.Map; @@ -28,18 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import net.kuujo.copycat.Task; -import net.kuujo.copycat.cluster.Cluster; -import net.kuujo.copycat.resource.ResourceState; import static com.google.common.base.Preconditions.checkState; /** @@ -100,10 +99,10 @@ public class PartitionedDatabase implements Database { return CompletableFuture.allOf(partitions .stream() .map(db -> db.counters() - .thenApply(m -> { - counters.putAll(m); - return null; - })) + .thenApply(m -> { + counters.putAll(m); + return null; + })) .toArray(CompletableFuture[]::new)) .thenApply(v -> counters); } @@ -113,9 +112,9 @@ public class PartitionedDatabase implements Database { checkState(isOpen.get(), DB_NOT_OPEN); AtomicInteger totalSize = new AtomicInteger(0); return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) - .toArray(CompletableFuture[]::new)) + .stream() + .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) + .toArray(CompletableFuture[]::new)) .thenApply(v -> totalSize.get()); } @@ -136,10 +135,10 @@ public class PartitionedDatabase implements Database { checkState(isOpen.get(), DB_NOT_OPEN); AtomicBoolean containsValue = new AtomicBoolean(false); return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapContainsValue(mapName, value) - .thenApply(v -> containsValue.compareAndSet(false, v))) - .toArray(CompletableFuture[]::new)) + .stream() + .map(p -> p.mapContainsValue(mapName, value) + .thenApply(v -> containsValue.compareAndSet(false, v))) + .toArray(CompletableFuture[]::new)) .thenApply(v -> containsValue.get()); } @@ -196,9 +195,9 @@ public class PartitionedDatabase implements Database { checkState(isOpen.get(), DB_NOT_OPEN); Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) - .toArray(CompletableFuture[]::new)) + .stream() + .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) + .toArray(CompletableFuture[]::new)) .thenApply(v -> entrySet); } @@ -220,6 +219,19 @@ public class PartitionedDatabase implements Database { return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); } + @Override + public CompletableFuture<Void> counterSet(String counterName, long value) { + checkState(isOpen.get(), DB_NOT_OPEN); + return partitioner.getPartition(counterName, counterName).counterSet(counterName, value); + } + + @Override + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) { + checkState(isOpen.get(), DB_NOT_OPEN); + return partitioner.getPartition(counterName, counterName). + counterCompareAndSet(counterName, expectedValue, updateValue); + + } @Override public CompletableFuture<Long> queueSize(String queueName) { @@ -268,8 +280,8 @@ public class PartitionedDatabase implements Database { AtomicBoolean status = new AtomicBoolean(true); return CompletableFuture.allOf(subTransactions.entrySet() .stream() - .map(entry -> entry - .getKey() + .map(entry -> entry + .getKey() .prepare(entry.getValue()) .thenApply(v -> status.compareAndSet(true, v))) .toArray(CompletableFuture[]::new)) @@ -282,15 +294,15 @@ public class PartitionedDatabase implements Database { AtomicBoolean success = new AtomicBoolean(true); List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList(); return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().commit(entry.getValue()) - .thenAccept(response -> { - success.set(success.get() && response.success()); - if (success.get()) { - allUpdates.addAll(response.updates()); - } - })) - .toArray(CompletableFuture[]::new)) + .stream() + .map(entry -> entry.getKey().commit(entry.getValue()) + .thenAccept(response -> { + success.set(success.get() && response.success()); + if (success.get()) { + allUpdates.addAll(response.updates()); + } + })) + .toArray(CompletableFuture[]::new)) .thenApply(v -> success.get() ? CommitResponse.success(allUpdates) : CommitResponse.failure()); } @@ -301,7 +313,7 @@ public class PartitionedDatabase implements Database { return CompletableFuture.allOf(subTransactions.entrySet() .stream() .map(entry -> entry.getKey().rollback(entry.getValue())) - .toArray(CompletableFuture[]::new)) + .toArray(CompletableFuture[]::new)) .thenApply(v -> true); } @@ -384,3 +396,4 @@ public class PartitionedDatabase implements Database { partitions.forEach(p -> p.unregisterConsumer(consumer)); } } + diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java index 2859b62f..f1e0dbd4 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - import org.apache.commons.lang3.tuple.Pair; import org.onlab.util.AbstractAccumulator; import org.onlab.util.KryoNamespace; @@ -33,18 +32,15 @@ import org.onosproject.store.Timestamp; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.impl.LogicalTimestamp; -import org.onosproject.store.service.WallClockTimestamp; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.EventuallyConsistentMap; import org.onosproject.store.service.EventuallyConsistentMapEvent; import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.WallClockTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; - import java.util.Collection; import java.util.Collections; import java.util.List; @@ -67,6 +63,8 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; /** * Distributed Map implementation which uses optimistic replication and gossip @@ -359,7 +357,7 @@ public class EventuallyConsistentMapImpl<K, V> valueMatches = Objects.equals(value.get(), existing.get()); } if (existing == null) { - log.debug("ECMap Remove: Existing value for key {} is already null", k); + log.trace("ECMap Remove: Existing value for key {} is already null", k); } if (valueMatches) { if (existing == null) { @@ -523,7 +521,7 @@ public class EventuallyConsistentMapImpl<K, V> return; } peers.forEach(node -> - senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event) + senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event) ); } @@ -576,8 +574,10 @@ public class EventuallyConsistentMapImpl<K, V> return; } try { - log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it", - mapName, ad.sender(), ad.digest().size()); + if (log.isTraceEnabled()) { + log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it", + mapName, ad.sender(), ad.digest().size()); + } antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners); if (!lightweightAntiEntropy) { @@ -675,4 +675,4 @@ public class EventuallyConsistentMapImpl<K, V> }); } } -}
\ No newline at end of file +} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java index de7a3ac3..8cd63e7d 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java @@ -16,6 +16,7 @@ package org.onosproject.store.flow.impl; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -57,6 +58,7 @@ import org.onosproject.net.flow.FlowRuleService; import org.onosproject.net.flow.FlowRuleStore; import org.onosproject.net.flow.FlowRuleStoreDelegate; import org.onosproject.net.flow.StoredFlowEntry; +import org.onosproject.net.flow.TableStatisticsEntry; import org.onosproject.store.AbstractStore; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.ClusterMessage; @@ -64,9 +66,16 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler; import org.onosproject.store.flow.ReplicaInfoEvent; import org.onosproject.store.flow.ReplicaInfoEventListener; import org.onosproject.store.flow.ReplicaInfoService; +import org.onosproject.store.impl.MastershipBasedTimestamp; +import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.serializers.StoreSerializer; import org.onosproject.store.serializers.custom.DistributedStoreSerializers; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.WallClockTimestamp; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; @@ -151,6 +160,13 @@ public class NewDistributedFlowRuleStore private final ScheduledExecutorService backupSenderExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender")); + private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats; + private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener = + new InternalTableStatsListener(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + protected static final StoreSerializer SERIALIZER = new KryoSerializer() { @Override protected void setupKryoPool() { @@ -161,6 +177,11 @@ public class NewDistributedFlowRuleStore } }; + protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(MastershipBasedTimestamp.class); + + private IdGenerator idGenerator; private NodeId local; @@ -186,6 +207,15 @@ public class NewDistributedFlowRuleStore TimeUnit.MILLISECONDS); } + deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder() + .withName("onos-flow-table-stats") + .withSerializer(SERIALIZER_BUILDER) + .withAntiEntropyPeriod(5, TimeUnit.SECONDS) + .withTimestampProvider((k, v) -> new WallClockTimestamp()) + .withTombstonesDisabled() + .build(); + deviceTableStats.addListener(tableStatsListener); + logConfig("Started"); } @@ -197,6 +227,8 @@ public class NewDistributedFlowRuleStore } configService.unregisterProperties(getClass(), false); unregisterMessageHandlers(); + deviceTableStats.removeListener(tableStatsListener); + deviceTableStats.destroy(); messageHandlingExecutor.shutdownNow(); backupSenderExecutor.shutdownNow(); log.info("Stopped"); @@ -786,4 +818,36 @@ public class NewDistributedFlowRuleStore return backedupDevices; } } + + @Override + public FlowRuleEvent updateTableStatistics(DeviceId deviceId, + List<TableStatisticsEntry> tableStats) { + deviceTableStats.put(deviceId, tableStats); + return null; + } + + @Override + public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) { + NodeId master = mastershipService.getMasterFor(deviceId); + + if (master == null) { + log.debug("Failed to getTableStats: No master for {}", deviceId); + return Collections.emptyList(); + } + + List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId); + if (tableStats == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(tableStats); + } + + private class InternalTableStatsListener + implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> { + @Override + public void event(EventuallyConsistentMapEvent<DeviceId, + List<TableStatisticsEntry>> event) { + //TODO: Generate an event to listeners (do we need?) + } + } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java index 97333ebf..a999ee7f 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java @@ -28,19 +28,11 @@ import org.apache.felix.scr.annotations.Service; import org.onlab.util.KryoNamespace; import org.onlab.util.NewConcurrentHashMap; import org.onosproject.cluster.ClusterService; -import org.onosproject.core.DefaultApplicationId; import org.onosproject.core.DefaultGroupId; import org.onosproject.core.GroupId; import org.onosproject.mastership.MastershipService; import org.onosproject.net.DeviceId; import org.onosproject.net.MastershipRole; -import org.onosproject.net.PortNumber; -import org.onosproject.net.flow.DefaultTrafficTreatment; -import org.onosproject.net.flow.FlowRule; -import org.onosproject.net.flow.instructions.Instructions; -import org.onosproject.net.flow.instructions.L0ModificationInstruction; -import org.onosproject.net.flow.instructions.L2ModificationInstruction; -import org.onosproject.net.flow.instructions.L3ModificationInstruction; import org.onosproject.net.group.DefaultGroup; import org.onosproject.net.group.DefaultGroupBucket; import org.onosproject.net.group.DefaultGroupDescription; @@ -61,9 +53,7 @@ import org.onosproject.net.group.StoredGroupEntry; import org.onosproject.store.AbstractStore; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.service.MultiValuedTimestamp; -import org.onosproject.store.serializers.DeviceIdSerializer; import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.URISerializer; import org.onosproject.store.service.EventuallyConsistentMap; import org.onosproject.store.service.EventuallyConsistentMapBuilder; import org.onosproject.store.service.EventuallyConsistentMapEvent; @@ -71,7 +61,6 @@ import org.onosproject.store.service.EventuallyConsistentMapListener; import org.onosproject.store.service.StorageService; import org.slf4j.Logger; -import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -139,9 +128,12 @@ public class DistributedGroupStore private final AtomicLong sequenceNumber = new AtomicLong(0); + private KryoNamespace clusterMsgSerializer; + @Activate public void activate() { kryoBuilder = new KryoNamespace.Builder() + .register(KryoNamespaces.API) .register(DefaultGroup.class, DefaultGroupBucket.class, DefaultGroupDescription.class, @@ -158,38 +150,9 @@ public class DistributedGroupStore GroupStoreKeyMapKey.class, GroupStoreIdMapKey.class, GroupStoreMapKey.class - ) - .register(new URISerializer(), URI.class) - .register(new DeviceIdSerializer(), DeviceId.class) - .register(PortNumber.class) - .register(DefaultApplicationId.class) - .register(DefaultTrafficTreatment.class, - Instructions.DropInstruction.class, - Instructions.OutputInstruction.class, - Instructions.GroupInstruction.class, - Instructions.TableTypeTransition.class, - FlowRule.Type.class, - L0ModificationInstruction.class, - L0ModificationInstruction.L0SubType.class, - L0ModificationInstruction.ModLambdaInstruction.class, - L2ModificationInstruction.class, - L2ModificationInstruction.L2SubType.class, - L2ModificationInstruction.ModEtherInstruction.class, - L2ModificationInstruction.PushHeaderInstructions.class, - L2ModificationInstruction.ModVlanIdInstruction.class, - L2ModificationInstruction.ModVlanPcpInstruction.class, - L2ModificationInstruction.ModMplsLabelInstruction.class, - L2ModificationInstruction.ModMplsTtlInstruction.class, - L3ModificationInstruction.class, - L3ModificationInstruction.L3SubType.class, - L3ModificationInstruction.ModIPInstruction.class, - L3ModificationInstruction.ModIPv6FlowLabelInstruction.class, - L3ModificationInstruction.ModTtlInstruction.class, - org.onlab.packet.MplsLabel.class - ) - .register(org.onosproject.cluster.NodeId.class) - .register(KryoNamespaces.BASIC) - .register(KryoNamespaces.MISC); + ); + + clusterMsgSerializer = kryoBuilder.build(); messageHandlingExecutor = Executors. newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, @@ -197,7 +160,7 @@ public class DistributedGroupStore "message-handlers")); clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, - kryoBuilder.build()::deserialize, + clusterMsgSerializer::deserialize, this::process, messageHandlingExecutor); @@ -233,6 +196,7 @@ public class DistributedGroupStore @Deactivate public void deactivate() { + clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST); groupStoreEntriesByKey.destroy(); auditPendingReqQueue.destroy(); log.info("Stopped"); @@ -313,8 +277,6 @@ public class DistributedGroupStore @Override public Iterable<Group> getGroups(DeviceId deviceId) { // flatten and make iterator unmodifiable - log.debug("getGroups: for device {} total number of groups {}", - deviceId, getGroupStoreKeyMap().values().size()); return FluentIterable.from(getGroupStoreKeyMap().values()) .filter(input -> input.deviceId().equals(deviceId)) .transform(input -> input); @@ -322,8 +284,6 @@ public class DistributedGroupStore private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) { // flatten and make iterator unmodifiable - log.debug("getGroups: for device {} total number of groups {}", - deviceId, getGroupStoreKeyMap().values().size()); return FluentIterable.from(getGroupStoreKeyMap().values()) .filter(input -> input.deviceId().equals(deviceId)); } @@ -411,7 +371,7 @@ public class DistributedGroupStore clusterCommunicator.unicast(groupOp, GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, - m -> kryoBuilder.build().serialize(m), + clusterMsgSerializer::serialize, mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> { if (error != null) { log.warn("Failed to send request to master: {} to {}", @@ -609,7 +569,7 @@ public class DistributedGroupStore clusterCommunicator.unicast(groupOp, GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, - m -> kryoBuilder.build().serialize(m), + clusterMsgSerializer::serialize, mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> { if (error != null) { log.warn("Failed to send request to master: {} to {}", @@ -741,7 +701,7 @@ public class DistributedGroupStore clusterCommunicator.unicast(groupOp, GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, - m -> kryoBuilder.build().serialize(m), + clusterMsgSerializer::serialize, mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> { if (error != null) { log.warn("Failed to send request to master: {} to {}", diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java index d0b827cd..f9c96891 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java @@ -27,6 +27,7 @@ import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.RE import static org.slf4j.LoggerFactory.getLogger; import java.util.Collection; +import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -67,7 +68,6 @@ import org.onosproject.store.service.StorageService; import org.slf4j.Logger; import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; @@ -197,6 +197,35 @@ public class ECHostStore } @Override + public HostEvent removeIp(HostId hostId, IpAddress ipAddress) { + DefaultHost host = hosts.compute(hostId, (id, existingHost) -> { + if (existingHost != null) { + checkState(Objects.equals(hostId.mac(), existingHost.mac()), + "Existing and new MAC addresses differ."); + checkState(Objects.equals(hostId.vlanId(), existingHost.vlan()), + "Existing and new VLANs differ."); + + Set<IpAddress> addresses = existingHost.ipAddresses(); + if (addresses != null && addresses.contains(ipAddress)) { + addresses = new HashSet<>(existingHost.ipAddresses()); + addresses.remove(ipAddress); + return new DefaultHost(existingHost.providerId(), + hostId, + existingHost.mac(), + existingHost.vlan(), + existingHost.location(), + ImmutableSet.copyOf(addresses), + existingHost.annotations()); + } else { + return existingHost; + } + } + return null; + }); + return host != null ? new HostEvent(HOST_UPDATED, host) : null; + } + + @Override public int getHostCount() { return hosts.size(); } @@ -228,17 +257,23 @@ public class ECHostStore @Override public Set<Host> getConnectedHosts(ConnectPoint connectPoint) { - return ImmutableSet.copyOf(locations.get(connectPoint)); + synchronized (locations) { + return ImmutableSet.copyOf(locations.get(connectPoint)); + } } @Override public Set<Host> getConnectedHosts(DeviceId deviceId) { - return ImmutableMultimap.copyOf(locations) - .entries() - .stream() - .filter(entry -> entry.getKey().deviceId().equals(deviceId)) - .map(entry -> entry.getValue()) - .collect(Collectors.toSet()); + Set<Host> filtered; + synchronized (locations) { + filtered = locations + .entries() + .stream() + .filter(entry -> entry.getKey().deviceId().equals(deviceId)) + .map(entry -> entry.getValue()) + .collect(Collectors.toSet()); + } + return ImmutableSet.copyOf(filtered); } private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) { diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java index fa3a0751..1e5db99c 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java @@ -193,7 +193,7 @@ public class GossipIntentStore private Collection<NodeId> getPeerNodes(Key key, IntentData data) { NodeId master = partitionService.getLeader(key); NodeId origin = (data != null) ? data.origin() : null; - if (master == null || origin == null) { + if (data != null && (master == null || origin == null)) { log.debug("Intent {} missing master and/or origin; master = {}, origin = {}", key, master, origin); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java index 105c77df..47aa85c5 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java @@ -826,7 +826,7 @@ public class GossipLinkStore public void handle(ClusterMessage message) { log.trace("Received link event from peer: {}", message.sender()); - InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload()); + InternalLinkEvent event = SERIALIZER.decode(message.payload()); ProviderId providerId = event.providerId(); Timestamped<LinkDescription> linkDescription = event.linkDescription(); @@ -845,7 +845,7 @@ public class GossipLinkStore public void handle(ClusterMessage message) { log.trace("Received link removed event from peer: {}", message.sender()); - InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload()); + InternalLinkRemovedEvent event = SERIALIZER.decode(message.payload()); LinkKey linkKey = event.linkKey(); Timestamp timestamp = event.timestamp(); diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java index d4c89c93..f0f3eb5e 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java @@ -15,7 +15,9 @@ */ package org.onosproject.store.packet.impl; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -41,14 +43,13 @@ import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Versioned; import org.slf4j.Logger; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; @@ -117,6 +118,7 @@ public class DistributedPacketStore public void deactivate() { communicationService.removeSubscriber(PACKET_OUT_SUBJECT); messageHandlingExecutor.shutdown(); + tracker = null; log.info("Stopped"); } @@ -143,13 +145,13 @@ public class DistributedPacketStore } @Override - public boolean requestPackets(PacketRequest request) { - return tracker.add(request); + public void requestPackets(PacketRequest request) { + tracker.add(request); } @Override - public boolean cancelPackets(PacketRequest request) { - return tracker.remove(request); + public void cancelPackets(PacketRequest request) { + tracker.remove(request); } @Override @@ -169,33 +171,50 @@ public class DistributedPacketStore .build(); } - public boolean add(PacketRequest request) { - Versioned<Set<PacketRequest>> old = requests.get(request.selector()); - if (old != null && old.value().contains(request)) { - return false; + public void add(PacketRequest request) { + AtomicBoolean firstRequest = new AtomicBoolean(false); + requests.compute(request.selector(), (s, existingRequests) -> { + if (existingRequests == null) { + firstRequest.set(true); + return ImmutableSet.of(request); + } else if (!existingRequests.contains(request)) { + return ImmutableSet.<PacketRequest>builder() + .addAll(existingRequests) + .add(request) + .build(); + } else { + return existingRequests; + } + }); + + if (firstRequest.get() && delegate != null) { + // The instance that makes the first request will push to all devices + delegate.requestPackets(request); } - // FIXME: add retry logic using a random delay - Set<PacketRequest> newSet = new HashSet<>(); - newSet.add(request); - if (old == null) { - return requests.putIfAbsent(request.selector(), newSet) == null; - } - newSet.addAll(old.value()); - return requests.replace(request.selector(), old.version(), newSet); } - public boolean remove(PacketRequest request) { - Versioned<Set<PacketRequest>> old = requests.get(request.selector()); - if (old == null || !old.value().contains(request)) { - return false; - } - // FIXME: add retry logic using a random delay - Set<PacketRequest> newSet = new HashSet<>(old.value()); - newSet.remove(request); - if (newSet.isEmpty()) { - return requests.remove(request.selector(), old.version()); + public void remove(PacketRequest request) { + AtomicBoolean removedLast = new AtomicBoolean(false); + requests.computeIfPresent(request.selector(), (s, existingRequests) -> { + if (existingRequests.contains(request)) { + Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests); + newRequests.remove(request); + if (newRequests.size() > 0) { + return ImmutableSet.copyOf(newRequests); + } else { + removedLast.set(true); + return null; + } + } else { + return existingRequests; + } + }); + + if (removedLast.get() && delegate != null) { + // The instance that removes the last request will remove from all devices + delegate.cancelPackets(request); } - return requests.replace(request.selector(), old.version(), newSet); + } public List<PacketRequest> requests() { @@ -204,6 +223,5 @@ public class DistributedPacketStore list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); return list; } - } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java new file mode 100644 index 00000000..87e67215 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentIntentSetMultimap.java @@ -0,0 +1,111 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.resource.impl; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.intent.IntentId; +import org.onosproject.net.resource.device.IntentSetMultimap; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.Set; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * A collection that maps Intent IDs as keys to values as Intent IDs, + * where each key may associated with multiple values without duplication. + */ +@Component(immediate = true, enabled = true) +@Service +public class ConsistentIntentSetMultimap implements IntentSetMultimap { + private final Logger log = getLogger(getClass()); + + private static final String INTENT_MAPPING = "IntentMapping"; + + private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API); + + private ConsistentMap<IntentId, Set<IntentId>> intentMapping; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + + @Activate + public void activate() { + intentMapping = storageService.<IntentId, Set<IntentId>>consistentMapBuilder() + .withName(INTENT_MAPPING) + .withSerializer(SERIALIZER) + .build(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public Set<IntentId> getMapping(IntentId intentId) { + Versioned<Set<IntentId>> result = intentMapping.get(intentId); + + if (result != null) { + return result.value(); + } + + return null; + } + + @Override + public boolean allocateMapping(IntentId keyIntentId, IntentId valIntentId) { + Versioned<Set<IntentId>> versionedIntents = intentMapping.get(keyIntentId); + + if (versionedIntents == null) { + Set<IntentId> newSet = new HashSet<>(); + newSet.add(valIntentId); + intentMapping.put(keyIntentId, newSet); + } else { + versionedIntents.value().add(valIntentId); + } + + return true; + } + + @Override + public void releaseMapping(IntentId intentId) { + for (IntentId intent : intentMapping.keySet()) { + // TODO: optimize by checking for identical src & dst + Set<IntentId> mapping = intentMapping.get(intent).value(); + if (mapping.remove(intentId)) { + return; + } + } + } + +} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java index 3a296353..11137aa2 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java @@ -40,7 +40,6 @@ import org.onosproject.net.Link; import org.onosproject.net.LinkKey; import org.onosproject.net.Port; import org.onosproject.net.intent.IntentId; -import org.onosproject.net.link.LinkService; import org.onosproject.net.resource.link.BandwidthResource; import org.onosproject.net.resource.link.BandwidthResourceAllocation; import org.onosproject.net.resource.link.LambdaResource; @@ -69,7 +68,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static org.slf4j.LoggerFactory.getLogger; import static org.onosproject.net.AnnotationKeys.BANDWIDTH; @@ -108,9 +106,6 @@ public class ConsistentLinkResourceStore extends protected StorageService storageService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected LinkService linkService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceService deviceService; @Activate @@ -139,29 +134,30 @@ public class ConsistentLinkResourceStore extends return storageService.transactionContextBuilder().build(); } - private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) { - if (type == ResourceType.BANDWIDTH) { - return ImmutableSet.of(getBandwidthResourceCapacity(link)); - } - if (type == ResourceType.LAMBDA) { - return getLambdaResourceCapacity(link); + private Set<ResourceAllocation> getResourceCapacity(ResourceType type, Link link) { + switch (type) { + case BANDWIDTH: + return ImmutableSet.of(getBandwidthResourceCapacity(link)); + case LAMBDA: + return getLambdaResourceCapacity(link); + case MPLS_LABEL: + return getMplsResourceCapacity(); + default: + return ImmutableSet.of(); } - if (type == ResourceType.MPLS_LABEL) { - return getMplsResourceCapacity(); - } - return ImmutableSet.of(); } - private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) { - Set<LambdaResourceAllocation> allocations = new HashSet<>(); + private Set<ResourceAllocation> getLambdaResourceCapacity(Link link) { Port port = deviceService.getPort(link.src().deviceId(), link.src().port()); - if (port instanceof OmsPort) { - OmsPort omsPort = (OmsPort) port; + if (!(port instanceof OmsPort)) { + return Collections.emptySet(); + } - // Assume fixed grid for now - for (int i = 0; i < omsPort.totalChannels(); i++) { - allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i))); - } + OmsPort omsPort = (OmsPort) port; + Set<ResourceAllocation> allocations = new HashSet<>(); + // Assume fixed grid for now + for (int i = 0; i < omsPort.totalChannels(); i++) { + allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i))); } return allocations; } @@ -170,26 +166,23 @@ public class ConsistentLinkResourceStore extends // if Link annotation exist, use them // if all fails, use DEFAULT_BANDWIDTH - BandwidthResource bandwidth = null; + BandwidthResource bandwidth = DEFAULT_BANDWIDTH; String strBw = link.annotations().value(BANDWIDTH); - if (strBw != null) { - try { - bandwidth = new BandwidthResource(Bandwidth.mbps(Double.parseDouble(strBw))); - } catch (NumberFormatException e) { - // do nothings - bandwidth = null; - } + if (strBw == null) { + return new BandwidthResourceAllocation(bandwidth); } - if (bandwidth == null) { - // fall back, use fixed default + try { + bandwidth = new BandwidthResource(Bandwidth.mbps(Double.parseDouble(strBw))); + } catch (NumberFormatException e) { + // do nothings, use default bandwidth bandwidth = DEFAULT_BANDWIDTH; } return new BandwidthResourceAllocation(bandwidth); } - private Set<MplsLabelResourceAllocation> getMplsResourceCapacity() { - Set<MplsLabelResourceAllocation> allocations = new HashSet<>(); + private Set<ResourceAllocation> getMplsResourceCapacity() { + Set<ResourceAllocation> allocations = new HashSet<>(); //Ignoring reserved labels of 0 through 15 for (int i = MIN_UNRESERVED_LABEL; i <= MAX_UNRESERVED_LABEL; i++) { allocations.add(new MplsLabelResourceAllocation(MplsLabel @@ -199,13 +192,11 @@ public class ConsistentLinkResourceStore extends return allocations; } - private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) { - Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>(); + private Map<ResourceType, Set<ResourceAllocation>> getResourceCapacity(Link link) { + Map<ResourceType, Set<ResourceAllocation>> caps = new HashMap<>(); for (ResourceType type : ResourceType.values()) { - Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link); - if (cap != null) { - caps.put(type, cap); - } + Set<ResourceAllocation> cap = getResourceCapacity(type, link); + caps.put(type, cap); } return caps; } @@ -216,106 +207,80 @@ public class ConsistentLinkResourceStore extends tx.begin(); try { - Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(tx, link); - Set<ResourceAllocation> allFree = new HashSet<>(); - freeResources.values().forEach(allFree::addAll); - return allFree; + Map<ResourceType, Set<ResourceAllocation>> freeResources = getFreeResourcesEx(tx, link); + return freeResources.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); } finally { tx.abort(); } } - private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(TransactionContext tx, Link link) { + private Map<ResourceType, Set<ResourceAllocation>> getFreeResourcesEx(TransactionContext tx, Link link) { checkNotNull(tx); checkNotNull(link); - Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>(); - final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link); - final Iterable<LinkResourceAllocations> allocations = getAllocations(tx, link); + Map<ResourceType, Set<ResourceAllocation>> free = new HashMap<>(); + final Map<ResourceType, Set<ResourceAllocation>> caps = getResourceCapacity(link); + final List<LinkResourceAllocations> allocations = ImmutableList.copyOf(getAllocations(tx, link)); - for (ResourceType type : ResourceType.values()) { - // there should be class/category of resources + Set<ResourceAllocation> bw = caps.get(ResourceType.BANDWIDTH); + Set<ResourceAllocation> value = getFreeBandwidthResources(link, bw, allocations); + free.put(ResourceType.BANDWIDTH, value); - switch (type) { - case BANDWIDTH: - Set<? extends ResourceAllocation> bw = caps.get(type); - if (bw == null || bw.isEmpty()) { - bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW)); - } + Set<ResourceAllocation> lmd = caps.get(ResourceType.LAMBDA); + Set<ResourceAllocation> freeL = getFreeResources(link, lmd, allocations, + LambdaResourceAllocation.class); + free.put(ResourceType.LAMBDA, freeL); - BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next(); - double freeBw = cap.bandwidth().toDouble(); - - // enumerate current allocations, subtracting resources - for (LinkResourceAllocations alloc : allocations) { - Set<ResourceAllocation> types = alloc.getResourceAllocation(link); - for (ResourceAllocation a : types) { - if (a instanceof BandwidthResourceAllocation) { - BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a; - freeBw -= bwA.bandwidth().toDouble(); - } - } - } + Set<ResourceAllocation> mpls = caps.get(ResourceType.MPLS_LABEL); + Set<ResourceAllocation> freeLabel = getFreeResources(link, mpls, allocations, + MplsLabelResourceAllocation.class); + free.put(ResourceType.MPLS_LABEL, freeLabel); - free.put(type, Sets.newHashSet( - new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(freeBw))))); - break; - case LAMBDA: - Set<? extends ResourceAllocation> lmd = caps.get(type); - if (lmd == null || lmd.isEmpty()) { - // nothing left - break; - } - Set<LambdaResourceAllocation> freeL = new HashSet<>(); - for (ResourceAllocation r : lmd) { - if (r instanceof LambdaResourceAllocation) { - freeL.add((LambdaResourceAllocation) r); - } - } - - // enumerate current allocations, removing resources - for (LinkResourceAllocations alloc : allocations) { - Set<ResourceAllocation> types = alloc.getResourceAllocation(link); - for (ResourceAllocation a : types) { - if (a instanceof LambdaResourceAllocation) { - freeL.remove(a); - } - } - } + return free; + } - free.put(type, freeL); - break; - case MPLS_LABEL: - Set<? extends ResourceAllocation> mpls = caps.get(type); - if (mpls == null || mpls.isEmpty()) { - // nothing left - break; - } - Set<MplsLabelResourceAllocation> freeLabel = new HashSet<>(); - for (ResourceAllocation r : mpls) { - if (r instanceof MplsLabelResourceAllocation) { - freeLabel.add((MplsLabelResourceAllocation) r); - } - } + private Set<ResourceAllocation> getFreeBandwidthResources(Link link, Set<ResourceAllocation> bw, + List<LinkResourceAllocations> allocations) { + if (bw == null || bw.isEmpty()) { + bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW)); + } - // enumerate current allocations, removing resources - for (LinkResourceAllocations alloc : allocations) { - Set<ResourceAllocation> types = alloc.getResourceAllocation(link); - for (ResourceAllocation a : types) { - if (a instanceof MplsLabelResourceAllocation) { - freeLabel.remove(a); - } - } - } + BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next(); + double freeBw = cap.bandwidth().toDouble(); + + // enumerate current allocations, subtracting resources + double allocatedBw = allocations.stream() + .flatMap(x -> x.getResourceAllocation(link).stream()) + .filter(x -> x instanceof BandwidthResourceAllocation) + .map(x -> (BandwidthResourceAllocation) x) + .mapToDouble(x -> x.bandwidth().toDouble()) + .sum(); + freeBw -= allocatedBw; + return Sets.newHashSet( + new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(freeBw)))); + } - free.put(type, freeLabel); - break; - default: - log.debug("unsupported ResourceType {}", type); - break; - } + private Set<ResourceAllocation> getFreeResources(Link link, + Set<ResourceAllocation> resources, + List<LinkResourceAllocations> allocations, + Class<? extends ResourceAllocation> cls) { + if (resources == null || resources.isEmpty()) { + // nothing left + return Collections.emptySet(); } - return free; + Set<ResourceAllocation> freeL = resources.stream() + .filter(cls::isInstance) + .collect(Collectors.toSet()); + + // enumerate current allocations, removing resources + List<ResourceAllocation> allocated = allocations.stream() + .flatMap(x -> x.getResourceAllocation(link).stream()) + .filter(cls::isInstance) + .collect(Collectors.toList()); + freeL.removeAll(allocated); + return freeL; } @Override @@ -329,6 +294,9 @@ public class ConsistentLinkResourceStore extends intentAllocs.put(allocations.intentId(), allocations); allocations.links().forEach(link -> allocateLinkResource(tx, link, allocations)); tx.commit(); + } catch (TransactionException | ResourceAllocationException e) { + log.error("Exception thrown, rolling back", e); + tx.abort(); } catch (Exception e) { log.error("Exception thrown, rolling back", e); tx.abort(); @@ -340,15 +308,13 @@ public class ConsistentLinkResourceStore extends LinkResourceAllocations allocations) { // requested resources Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link); - Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(tx, link); + Map<ResourceType, Set<ResourceAllocation>> available = getFreeResourcesEx(tx, link); for (ResourceAllocation req : reqs) { - Set<? extends ResourceAllocation> avail = available.get(req.type()); + Set<ResourceAllocation> avail = available.get(req.type()); if (req instanceof BandwidthResourceAllocation) { // check if allocation should be accepted if (avail.isEmpty()) { - checkState(!avail.isEmpty(), - "There's no Bandwidth resource on %s?", - link); + throw new ResourceAllocationException(String.format("There's no Bandwidth resource on %s?", link)); } BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next(); double bwLeft = bw.bandwidth().toDouble(); @@ -395,12 +361,7 @@ public class ConsistentLinkResourceStore extends if (before == null) { List<LinkResourceAllocations> after = new ArrayList<>(); after.add(allocations); - before = linkAllocs.putIfAbsent(linkKey, after); - if (before != null) { - // concurrent allocation detected, retry transaction : is this needed? - log.warn("Concurrent Allocation, retrying"); - throw new TransactionException(); - } + linkAllocs.putIfAbsent(linkKey, after); } else { List<LinkResourceAllocations> after = new ArrayList<>(before.size() + 1); after.addAll(before); @@ -500,19 +461,18 @@ public class ConsistentLinkResourceStore extends checkNotNull(link); final LinkKey key = LinkKey.linkKey(link); TransactionalMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx); - List<LinkResourceAllocations> res = null; - res = linkAllocs.get(key); - if (res == null) { - res = linkAllocs.putIfAbsent(key, new ArrayList<>()); + List<LinkResourceAllocations> res = linkAllocs.get(key); + if (res != null) { + return res; + } - if (res == null) { - return Collections.emptyList(); - } else { - return res; - } + res = linkAllocs.putIfAbsent(key, new ArrayList<>()); + if (res == null) { + return Collections.emptyList(); + } else { + return res; } - return res; } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java new file mode 100644 index 00000000..0cd4a831 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java @@ -0,0 +1,289 @@ +/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.statistic.impl;
+
+import com.google.common.base.Objects;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.statistic.FlowStatisticStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Maintains flow statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedFlowStatisticStore implements FlowStatisticStore {
+ private final Logger log = getLogger(getClass());
+
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private Map<ConnectPoint, Set<FlowEntry>> previous =
+ new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> current =
+ new ConcurrentHashMap<>();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ // register this store specific classes here
+ .build();
+ }
+ };
+
+ private NodeId local;
+ private ExecutorService messageHandlingExecutor;
+
+ private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
+
+ @Activate
+ public void activate() {
+ local = clusterService.getLocalNode().id();
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/statistic", "message-handlers"));
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(GET_PREVIOUS);
+ clusterCommunicator.removeSubscriber(GET_CURRENT);
+ messageHandlingExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public synchronized void removeFlowStatistic(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // remove this rule if present from current map
+ current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+
+ // remove this on if present from previous map
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ @Override
+ public synchronized void addFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // create one if absent and add this rule
+ current.putIfAbsent(cp, new HashSet<>());
+ current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
+
+ // remove previous one if present
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ public synchronized void updateFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ Set<FlowEntry> curr = current.get(cp);
+ if (curr == null) {
+ addFlowStatistic(rule);
+ } else {
+ Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
+ findAny();
+ if (f.isPresent() && rule.bytes() < f.get().bytes()) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " Invalid Flow Update! Will be removed!!" +
+ " curr flowId=" + Long.toHexString(rule.id().value()) +
+ ", prev flowId=" + Long.toHexString(f.get().id().value()) +
+ ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
+ ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
+ ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
+ // something is wrong! invalid flow entry, so delete it
+ removeFlowStatistic(rule);
+ return;
+ }
+ Set<FlowEntry> prev = previous.get(cp);
+ if (prev == null) {
+ prev = new HashSet<>();
+ previous.put(cp, prev);
+ }
+
+ // previous one is exist
+ if (f.isPresent()) {
+ // remove old one and add new one
+ prev.remove(rule);
+ if (!prev.add(f.get())) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into previous.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+
+ // remove old one and add new one
+ curr.remove(rule);
+ if (!curr.add(rule)) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into current.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+ }
+
+ @Override
+ public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getCurrentStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_CURRENT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+ return current.get(connectPoint);
+ }
+
+ @Override
+ public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getPreviousStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_PREVIOUS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+ return previous.get(connectPoint);
+ }
+
+ private ConnectPoint buildConnectPoint(FlowRule rule) {
+ PortNumber port = getOutput(rule);
+
+ if (port == null) {
+ return null;
+ }
+ ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+ return cp;
+ }
+
+ private PortNumber getOutput(FlowRule rule) {
+ for (Instruction i : rule.treatment().allInstructions()) {
+ if (i.type() == Instruction.Type.OUTPUT) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+ return out.port();
+ }
+ if (i.type() == Instruction.Type.DROP) {
+ return PortNumber.P0;
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java index 487fad9b..da4e3cc4 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java @@ -21,6 +21,7 @@ import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED; import static org.slf4j.LoggerFactory.getLogger; import java.util.Collections; +import java.util.Map; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -40,6 +41,7 @@ import org.onosproject.net.Device; import org.onosproject.net.DeviceId; import org.onosproject.net.Link; import org.onosproject.net.Path; +import org.onosproject.net.DisjointPath; import org.onosproject.net.provider.ProviderId; import org.onosproject.net.topology.ClusterId; import org.onosproject.net.topology.DefaultGraphDescription; @@ -74,7 +76,6 @@ public class DistributedTopologyStore implements TopologyStore { private final Logger log = getLogger(getClass()); - private volatile DefaultTopology current = new DefaultTopology(ProviderId.NONE, new DefaultGraphDescription(0L, System.currentTimeMillis(), @@ -167,6 +168,29 @@ public class DistributedTopologyStore } @Override + public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst) { + return defaultTopology(topology).getDisjointPaths(src, dst); + } + + @Override + public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst, + LinkWeight weight) { + return defaultTopology(topology).getDisjointPaths(src, dst, weight); + } + + @Override + public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst, + Map<Link, Object> riskProfile) { + return defaultTopology(topology).getDisjointPaths(src, dst, riskProfile); + } + + @Override + public Set<DisjointPath> getDisjointPaths(Topology topology, DeviceId src, DeviceId dst, + LinkWeight weight, Map<Link, Object> riskProfile) { + return defaultTopology(topology).getDisjointPaths(src, dst, weight, riskProfile); + } + + @Override public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) { return defaultTopology(topology).isInfrastructure(connectPoint); } diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java new file mode 100644 index 00000000..a7077a81 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.host.impl; + +import junit.framework.TestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.packet.IpAddress; +import org.onlab.packet.MacAddress; +import org.onosproject.net.Host; +import org.onosproject.net.HostId; +import org.onosproject.net.HostLocation; +import org.onosproject.net.host.DefaultHostDescription; +import org.onosproject.net.host.HostDescription; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.Timestamp; +import org.onosproject.store.service.LogicalClockService; +import org.onosproject.store.service.TestStorageService; + +import java.util.HashSet; +import java.util.Set; + +/** + * Tests for the ECHostStore. + */ +public class ECHostStoreTest extends TestCase { + + private ECHostStore ecXHostStore; + + private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a")); + + private static final IpAddress IP1 = IpAddress.valueOf("10.2.0.2"); + private static final IpAddress IP2 = IpAddress.valueOf("10.2.0.3"); + + private static final ProviderId PID = new ProviderId("of", "foo"); + + @Before + public void setUp() { + ecXHostStore = new ECHostStore(); + + ecXHostStore.storageService = new TestStorageService(); + ecXHostStore.clockService = new TestLogicalClockService(); + ecXHostStore.activate(); + } + + @After + public void tearDown() { + ecXHostStore.deactivate(); + } + + /** + * Tests the removeIp method call. + */ + @Test + public void testRemoveIp() { + Set<IpAddress> ips = new HashSet<>(); + ips.add(IP1); + ips.add(IP2); + + HostDescription description = new DefaultHostDescription(HOSTID.mac(), + HOSTID.vlanId(), + HostLocation.NONE, + ips); + ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false); + ecXHostStore.removeIp(HOSTID, IP1); + Host host = ecXHostStore.getHost(HOSTID); + + assertFalse(host.ipAddresses().contains(IP1)); + assertTrue(host.ipAddresses().contains(IP2)); + } + + /** + * Mocks the LogicalClockService class. + */ + class TestLogicalClockService implements LogicalClockService { + @Override + public Timestamp getTimestamp() { + return null; + } + } +}
\ No newline at end of file diff --git a/framework/src/onos/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/framework/src/onos/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java index 66ee7be7..5b5056cb 100644 --- a/framework/src/onos/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java +++ b/framework/src/onos/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java @@ -19,7 +19,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; - import org.onlab.packet.ChassisId; import org.onlab.packet.EthType; import org.onlab.packet.Ip4Address; @@ -85,11 +84,11 @@ import org.onosproject.net.device.PortStatistics; import org.onosproject.net.flow.CompletedBatchOperation; import org.onosproject.net.flow.DefaultFlowEntry; import org.onosproject.net.flow.DefaultFlowRule; +import org.onosproject.net.flow.DefaultTableStatisticsEntry; import org.onosproject.net.flow.DefaultTrafficSelector; import org.onosproject.net.flow.DefaultTrafficTreatment; import org.onosproject.net.flow.FlowEntry; import org.onosproject.net.flow.FlowId; -import org.onosproject.net.flow.FlowRule; import org.onosproject.net.flow.FlowRuleBatchEntry; import org.onosproject.net.flow.FlowRuleBatchEvent; import org.onosproject.net.flow.FlowRuleBatchOperation; @@ -97,6 +96,7 @@ import org.onosproject.net.flow.FlowRuleBatchRequest; import org.onosproject.net.flow.FlowRuleEvent; import org.onosproject.net.flow.FlowRuleExtPayLoad; import org.onosproject.net.flow.StoredFlowEntry; +import org.onosproject.net.flow.TableStatisticsEntry; import org.onosproject.net.flow.criteria.Criterion; import org.onosproject.net.flow.criteria.EthCriterion; import org.onosproject.net.flow.criteria.EthTypeCriterion; @@ -118,7 +118,6 @@ import org.onosproject.net.flow.criteria.MetadataCriterion; import org.onosproject.net.flow.criteria.MplsCriterion; import org.onosproject.net.flow.criteria.OchSignalCriterion; import org.onosproject.net.flow.criteria.OchSignalTypeCriterion; -import org.onosproject.net.flow.criteria.OpticalSignalTypeCriterion; import org.onosproject.net.flow.criteria.PortCriterion; import org.onosproject.net.flow.criteria.SctpPortCriterion; import org.onosproject.net.flow.criteria.TcpPortCriterion; @@ -302,7 +301,6 @@ public final class KryoNamespaces { DefaultHostDescription.class, DefaultFlowEntry.class, StoredFlowEntry.class, - FlowRule.Type.class, DefaultFlowRule.class, DefaultFlowEntry.class, DefaultPacketRequest.class, @@ -339,11 +337,11 @@ public final class KryoNamespaces { IndexedLambdaCriterion.class, OchSignalCriterion.class, OchSignalTypeCriterion.class, - OpticalSignalTypeCriterion.class, Criterion.class, Criterion.Type.class, DefaultTrafficTreatment.class, Instructions.DropInstruction.class, + Instructions.NoActionInstruction.class, Instructions.OutputInstruction.class, Instructions.GroupInstruction.class, Instructions.TableTypeTransition.class, @@ -425,7 +423,9 @@ public final class KryoNamespaces { DefaultAnnotations.class, PortStatistics.class, DefaultPortStatistics.class, - IntentDomainId.class + IntentDomainId.class, + TableStatisticsEntry.class, + DefaultTableStatisticsEntry.class ) .register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class) .register(new URISerializer(), URI.class) |