diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src')
18 files changed, 423 insertions, 262 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java index 6764c222..fe4aa0be 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java @@ -17,7 +17,9 @@ package org.onosproject.store.app; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableSet; - +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -37,6 +39,7 @@ import org.onosproject.common.app.ApplicationArchive; import org.onosproject.core.Application; import org.onosproject.core.ApplicationId; import org.onosproject.core.ApplicationIdStore; +import org.onosproject.core.CoreService; import org.onosproject.core.DefaultApplication; import org.onosproject.security.Permission; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; @@ -61,6 +64,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; +import static com.google.common.collect.Multimaps.newSetMultimap; +import static com.google.common.collect.Multimaps.synchronizedSetMultimap; import static com.google.common.io.ByteStreams.toByteArray; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.onlab.util.Tools.groupedThreads; @@ -115,6 +120,14 @@ public class GossipApplicationStore extends ApplicationArchive @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ApplicationIdStore idStore; + // Multimap to track which apps are required by others apps + // app -> { required-by, ... } + // Apps explicitly activated will be required by the CORE app + private final Multimap<ApplicationId, ApplicationId> requiredBy = + synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet)); + + private ApplicationId coreAppId; + @Activate public void activate() { KryoNamespace.Builder serializer = KryoNamespace.newBuilder() @@ -128,16 +141,16 @@ public class GossipApplicationStore extends ApplicationArchive groupedThreads("onos/store/app", "message-handler")); clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST, - bytes -> new String(bytes, Charsets.UTF_8), - name -> { - try { - return toByteArray(getApplicationInputStream(name)); - } catch (IOException e) { - throw new StorageException(e); - } - }, - Function.identity(), - messageHandlingExecutor); + bytes -> new String(bytes, Charsets.UTF_8), + name -> { + try { + return toByteArray(getApplicationInputStream(name)); + } catch (IOException e) { + throw new StorageException(e); + } + }, + Function.identity(), + messageHandlingExecutor); // FIXME: Consider consolidating into a single map. @@ -161,6 +174,7 @@ public class GossipApplicationStore extends ApplicationArchive .withTimestampProvider((k, v) -> clockService.getTimestamp()) .build(); + coreAppId = getId(CoreService.CORE_APP_NAME); log.info("Started"); } @@ -169,20 +183,34 @@ public class GossipApplicationStore extends ApplicationArchive * they are marked to be active. */ private void loadFromDisk() { - for (String name : getApplicationNames()) { - for (int i = 0; i < MAX_LOAD_RETRIES; i++) { - try { - Application app = create(getApplicationDescription(name), false); - if (app != null && isActive(app.id().name())) { - activate(app.id(), false); - // load app permissions - } - } catch (Exception e) { - log.warn("Unable to load application {} from disk; retrying", name); - randomDelay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake + getApplicationNames().forEach(appName -> { + Application app = loadFromDisk(appName); + if (app != null && isActive(app.id().name())) { + activate(app.id(), false); + // TODO Load app permissions + } + }); + } + + private Application loadFromDisk(String appName) { + for (int i = 0; i < MAX_LOAD_RETRIES; i++) { + try { + // Directly return if app already exists + ApplicationId appId = getId(appName); + if (appId != null) { + return getApplication(appId); } + + ApplicationDescription appDesc = getApplicationDescription(appName); + boolean success = appDesc.requiredApps().stream() + .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null); + return success ? create(appDesc, false) : null; + } catch (Exception e) { + log.warn("Unable to load application {} from disk; retrying", appName); + randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon } } + return null; } @Deactivate @@ -200,7 +228,6 @@ public class GossipApplicationStore extends ApplicationArchive public void setDelegate(ApplicationStoreDelegate delegate) { super.setDelegate(delegate); loadFromDisk(); -// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS); } @Override @@ -229,7 +256,15 @@ public class GossipApplicationStore extends ApplicationArchive @Override public Application create(InputStream appDescStream) { ApplicationDescription appDesc = saveApplication(appDescStream); - return create(appDesc, true); + if (hasPrerequisites(appDesc)) { + return create(appDesc, true); + } + throw new ApplicationException("Missing dependencies for app " + appDesc.name()); + } + + private boolean hasPrerequisites(ApplicationDescription app) { + return !app.requiredApps().stream().map(n -> getId(n)) + .anyMatch(id -> id == null || getApplication(id) == null); } private Application create(ApplicationDescription appDesc, boolean updateTime) { @@ -246,36 +281,80 @@ public class GossipApplicationStore extends ApplicationArchive public void remove(ApplicationId appId) { Application app = apps.get(appId); if (app != null) { + uninstallDependentApps(app); apps.remove(appId); states.remove(app); permissions.remove(app); } } + // Uninstalls all apps that depend on the given app. + private void uninstallDependentApps(Application app) { + getApplications().stream() + .filter(a -> a.requiredApps().contains(app.id().name())) + .forEach(a -> remove(a.id())); + } + @Override public void activate(ApplicationId appId) { + activate(appId, coreAppId); + } + + private void activate(ApplicationId appId, ApplicationId forAppId) { + requiredBy.put(appId, forAppId); activate(appId, true); } + private void activate(ApplicationId appId, boolean updateTime) { Application app = apps.get(appId); if (app != null) { if (updateTime) { updateTime(appId.name()); } + activateRequiredApps(app); states.put(app, ACTIVATED); } } + // Activates all apps required by this application. + private void activateRequiredApps(Application app) { + app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id())); + } + @Override public void deactivate(ApplicationId appId) { - Application app = apps.get(appId); - if (app != null) { - updateTime(appId.name()); - states.put(app, DEACTIVATED); + deactivateDependentApps(getApplication(appId)); + deactivate(appId, coreAppId); + } + + private void deactivate(ApplicationId appId, ApplicationId forAppId) { + requiredBy.remove(appId, forAppId); + if (requiredBy.get(appId).isEmpty()) { + Application app = apps.get(appId); + if (app != null) { + updateTime(appId.name()); + states.put(app, DEACTIVATED); + deactivateRequiredApps(app); + } } } + // Deactivates all apps that require this application. + private void deactivateDependentApps(Application app) { + getApplications().stream() + .filter(a -> states.get(a) == ACTIVATED) + .filter(a -> a.requiredApps().contains(app.id().name())) + .forEach(a -> deactivate(a.id())); + } + + // Deactivates all apps required by this application. + private void deactivateRequiredApps(Application app) { + app.requiredApps().stream().map(this::getId).map(this::getApplication) + .filter(a -> states.get(a) == ACTIVATED) + .forEach(a -> deactivate(a.id(), app.id())); + } + @Override public Set<Permission> getPermissions(ApplicationId appId) { Application app = apps.get(appId); @@ -424,6 +503,7 @@ public class GossipApplicationStore extends ApplicationArchive ApplicationId appId = idStore.registerApplication(appDesc.name()); return new DefaultApplication(appId, appDesc.version(), appDesc.description(), appDesc.origin(), appDesc.role(), appDesc.permissions(), - appDesc.featuresRepo(), appDesc.features()); + appDesc.featuresRepo(), appDesc.features(), + appDesc.requiredApps()); } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java index 9f6c4130..3cd992bb 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java @@ -7,6 +7,7 @@ import static org.slf4j.LoggerFactory.getLogger; import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.net.Inet4Address; import java.net.NetworkInterface; import java.util.Arrays; import java.util.Collection; @@ -55,6 +56,11 @@ public class StaticClusterMetadataStore implements ClusterMetadataStore { private final Logger log = getLogger(getClass()); + + private static final String ONOS_IP = "ONOS_IP"; + private static final String ONOS_INTERFACE = "ONOS_INTERFACE"; + private static final String ONOS_ALLOW_IPV6 = "ONOS_ALLOW_IPV6"; + private static final String DEFAULT_ONOS_INTERFACE = "eth0"; private static final String CLUSTER_METADATA_FILE = "../config/cluster.json"; private static final int DEFAULT_ONOS_PORT = 9876; private final File metadataFile = new File(CLUSTER_METADATA_FILE); @@ -194,28 +200,59 @@ public class StaticClusterMetadataStore private static String getSiteLocalAddress() { + + /* + * If the IP ONOS should use is set via the environment variable we will assume it is valid and should be used. + * Setting the IP address takes presidence over setting the interface via the environment. + */ + String useOnosIp = System.getenv(ONOS_IP); + if (useOnosIp != null) { + return useOnosIp; + } + + // Read environment variables for IP interface information or set to default + String useOnosInterface = System.getenv(ONOS_INTERFACE); + if (useOnosInterface == null) { + useOnosInterface = DEFAULT_ONOS_INTERFACE; + } + + // Capture if they want to limit IP address selection to only IPv4 (default). + boolean allowIPv6 = (System.getenv(ONOS_ALLOW_IPV6) != null); + Function<NetworkInterface, IpAddress> ipLookup = nif -> { - for (InetAddress address : Collections.list(nif.getInetAddresses())) { - if (address.isSiteLocalAddress()) { - return IpAddress.valueOf(address); + IpAddress fallback = null; + + // nif can be null if the interface name specified doesn't exist on the node's host + if (nif != null) { + for (InetAddress address : Collections.list(nif.getInetAddresses())) { + if (address.isSiteLocalAddress() && (allowIPv6 || address instanceof Inet4Address)) { + return IpAddress.valueOf(address); + } + if (fallback == null && !address.isLoopbackAddress() && !address.isMulticastAddress() + && (allowIPv6 || address instanceof Inet4Address)) { + fallback = IpAddress.valueOf(address); + } } } - return null; + return fallback; }; try { - IpAddress ip = ipLookup.apply(NetworkInterface.getByName("eth0")); + IpAddress ip = ipLookup.apply(NetworkInterface.getByName(useOnosInterface)); if (ip != null) { return ip.toString(); } for (NetworkInterface nif : Collections.list(getNetworkInterfaces())) { - ip = ipLookup.apply(nif); - if (ip != null) { - return ip.toString(); + if (!nif.getName().equals(useOnosInterface)) { + ip = ipLookup.apply(nif); + if (ip != null) { + return ip.toString(); + } } } } catch (Exception e) { throw new IllegalStateException("Unable to get network interfaces", e); } + return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString(); } -}
\ No newline at end of file +} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java index 23c81869..d61d7dcf 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java @@ -47,7 +47,7 @@ public class NettyMessagingManager extends NettyMessaging { @Activate public void activate() throws Exception { ControllerNode localNode = clusterMetadataService.getLocalNode(); - getTLSParameters(); + getTlsParameters(); super.start(new Endpoint(localNode.ip(), localNode.tcpPort())); log.info("Started"); } @@ -58,29 +58,29 @@ public class NettyMessagingManager extends NettyMessaging { log.info("Stopped"); } - private void getTLSParameters() { + private void getTlsParameters() { String tempString = System.getProperty("enableNettyTLS"); - enableNettyTLS = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString); - log.info("enableNettyTLS = {}", enableNettyTLS); - if (enableNettyTLS) { + enableNettyTls = Strings.isNullOrEmpty(tempString) ? TLS_DISABLED : Boolean.parseBoolean(tempString); + log.info("enableNettyTLS = {}", enableNettyTls); + if (enableNettyTls) { ksLocation = System.getProperty("javax.net.ssl.keyStore"); if (Strings.isNullOrEmpty(ksLocation)) { - enableNettyTLS = TLS_DISABLED; + enableNettyTls = TLS_DISABLED; return; } tsLocation = System.getProperty("javax.net.ssl.trustStore"); if (Strings.isNullOrEmpty(tsLocation)) { - enableNettyTLS = TLS_DISABLED; + enableNettyTls = TLS_DISABLED; return; } ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray(); if (MIN_KS_LENGTH > ksPwd.length) { - enableNettyTLS = TLS_DISABLED; + enableNettyTls = TLS_DISABLED; return; } tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray(); if (MIN_KS_LENGTH > tsPwd.length) { - enableNettyTLS = TLS_DISABLED; + enableNettyTls = TLS_DISABLED; return; } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java index 3e73d8f4..ca8eea37 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java @@ -60,6 +60,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import static com.google.common.base.Preconditions.checkArgument; import static org.onosproject.net.config.NetworkConfigEvent.Type.*; /** @@ -71,10 +72,12 @@ public class DistributedNetworkConfigStore extends AbstractStore<NetworkConfigEvent, NetworkConfigStoreDelegate> implements NetworkConfigStore { - private static final int MAX_BACKOFF = 10; - private final Logger log = LoggerFactory.getLogger(getClass()); + private static final int MAX_BACKOFF = 10; + private static final String INVALID_CONFIG_JSON = + "JSON node does not contain valid configuration"; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected StorageService storageService; @@ -187,8 +190,17 @@ public class DistributedNetworkConfigStore @Override public <S, C extends Config<S>> C applyConfig(S subject, Class<C> configClass, JsonNode json) { - return createConfig(subject, configClass, - configs.putAndGet(key(subject, configClass), json).value()); + // Create the configuration and validate it. + C config = createConfig(subject, configClass, json); + checkArgument(config.isValid(), INVALID_CONFIG_JSON); + + // Insert the validated configuration and get it back. + Versioned<JsonNode> versioned = configs.putAndGet(key(subject, configClass), json); + + // Re-create the config if for some reason what we attempted to put + // was supplanted by someone else already. + return versioned.value() == json ? config : + createConfig(subject, configClass, versioned.value()); } @Override diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java index 7e575b01..92db5b44 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java @@ -26,8 +26,12 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; /** - * Extension of DefaultAsyncConsistentMap that provides a weaker read consistency + * Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency * guarantee in return for better read performance. + * <p> + * For read/write operations that are local to a node this map implementation provides + * guarantees similar to a ConsistentMap. However for read/write operations executed + * across multiple nodes this implementation only provides eventual consistency. * * @param <K> key type * @param <V> value type @@ -68,4 +72,10 @@ public class AsyncCachingConsistentMap<K, V> extends DefaultAsyncConsistentMap<K } return cache.getUnchecked(key); } + + @Override + protected void beforeUpdate(K key) { + super.beforeUpdate(key); + cache.invalidate(key); + } }
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java index 3e89635a..90d81ee7 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java @@ -55,6 +55,7 @@ import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.NodeId; import org.onosproject.core.ApplicationId; import org.onosproject.core.IdGenerator; +import org.onosproject.persistence.PersistenceService; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; import org.onosproject.store.service.AtomicCounterBuilder; @@ -128,6 +129,9 @@ public class DatabaseManager implements StorageService, StorageAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterCommunicationService clusterCommunicator; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected PersistenceService persistenceService; + protected String nodeIdToUri(NodeId nodeId) { ControllerNode node = clusterService.getNode(nodeId); return String.format("onos://%s:%d", node.ip(), node.tcpPort()); @@ -312,7 +316,8 @@ public class DatabaseManager implements StorageService, StorageAdminService { @Override public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() { return new EventuallyConsistentMapBuilderImpl<>(clusterService, - clusterCommunicator); + clusterCommunicator, + persistenceService); } @Override diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java index 0ea66861..c6d300c9 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java @@ -405,6 +405,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V .thenApply(v -> v.updated()); } + /** + * Pre-update hook for performing required checks/actions before going forward with an update operation. + * @param key map key. + */ + protected void beforeUpdate(K key) { + checkIfUnmodifiable(); + } + private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) { return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode)); } @@ -413,7 +421,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V Match<V> oldValueMatch, Match<Long> oldVersionMatch, V value) { - checkIfUnmodifiable(); + beforeUpdate(key); return database.mapUpdate(name, keyCache.getUnchecked(key), oldValueMatch.map(serializer::encode), diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java index 687762e0..a9a9098e 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java @@ -986,6 +986,10 @@ public class GossipDeviceStore // accept removal request if given timestamp is newer than // the latest Timestamp from Primary provider DeviceDescriptions primDescs = getPrimaryDescriptions(descs); + if (primDescs == null) { + return null; + } + Timestamp lastTimestamp = primDescs.getLatestTimestamp(); if (timestamp.compareTo(lastTimestamp) <= 0) { // outdated event ignore @@ -1036,7 +1040,7 @@ public class GossipDeviceStore checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied"); - ProviderId primary = pickPrimaryPID(providerDescs); + ProviderId primary = pickPrimaryPid(providerDescs); DeviceDescriptions desc = providerDescs.get(primary); @@ -1099,7 +1103,7 @@ public class GossipDeviceStore private Port composePort(Device device, PortNumber number, Map<ProviderId, DeviceDescriptions> descsMap) { - ProviderId primary = pickPrimaryPID(descsMap); + ProviderId primary = pickPrimaryPid(descsMap); DeviceDescriptions primDescs = descsMap.get(primary); // if no primary, assume not enabled boolean isEnabled = false; @@ -1145,7 +1149,7 @@ public class GossipDeviceStore /** * @return primary ProviderID, or randomly chosen one if none exists */ - private ProviderId pickPrimaryPID( + private ProviderId pickPrimaryPid( Map<ProviderId, DeviceDescriptions> providerDescs) { ProviderId fallBackPrimary = null; for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { @@ -1161,7 +1165,7 @@ public class GossipDeviceStore private DeviceDescriptions getPrimaryDescriptions( Map<ProviderId, DeviceDescriptions> providerDescs) { - ProviderId pid = pickPrimaryPID(providerDescs); + ProviderId pid = pickPrimaryPid(providerDescs); return providerDescs.get(pid); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java index a553ffff..eb98c829 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java @@ -18,6 +18,7 @@ package org.onosproject.store.ecmap; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.NodeId; +import org.onosproject.persistence.PersistenceService; import org.onosproject.store.Timestamp; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.service.EventuallyConsistentMap; @@ -52,6 +53,8 @@ public class EventuallyConsistentMapBuilderImpl<K, V> private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS; private boolean convergeFaster = false; private boolean persistent = false; + private boolean persistentMap = false; + private final PersistenceService persistenceService; /** * Creates a new eventually consistent map builder. @@ -60,7 +63,9 @@ public class EventuallyConsistentMapBuilderImpl<K, V> * @param clusterCommunicator cluster communication service */ public EventuallyConsistentMapBuilderImpl(ClusterService clusterService, - ClusterCommunicationService clusterCommunicator) { + ClusterCommunicationService clusterCommunicator, + PersistenceService persistenceService) { + this.persistenceService = persistenceService; this.clusterService = checkNotNull(clusterService); this.clusterCommunicator = checkNotNull(clusterCommunicator); } @@ -133,6 +138,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V> @Override public EventuallyConsistentMapBuilder<K, V> withPersistence() { + checkNotNull(this.persistenceService); persistent = true; return this; } @@ -156,6 +162,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V> antiEntropyPeriod, antiEntropyTimeUnit, convergeFaster, - persistent); + persistent, + persistenceService); } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java index f1e0dbd4..b5ea52e0 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java @@ -28,6 +28,7 @@ import org.onlab.util.SlidingWindowCounter; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.NodeId; +import org.onosproject.persistence.PersistenceService; import org.onosproject.store.Timestamp; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; @@ -37,6 +38,7 @@ import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.EventuallyConsistentMap; import org.onosproject.store.service.EventuallyConsistentMapEvent; import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.Serializer; import org.onosproject.store.service.WallClockTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +83,7 @@ public class EventuallyConsistentMapImpl<K, V> private final ClusterCommunicationService clusterCommunicator; private final KryoSerializer serializer; private final NodeId localNodeId; + private final PersistenceService persistenceService; private final BiFunction<K, V, Timestamp> timestampProvider; @@ -116,7 +119,9 @@ public class EventuallyConsistentMapImpl<K, V> private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE); private final boolean persistent; - private final PersistentStore<K, V> persistentStore; + + private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap"; + /** * Creates a new eventually consistent map shared amongst multiple instances. @@ -158,9 +163,32 @@ public class EventuallyConsistentMapImpl<K, V> long antiEntropyPeriod, TimeUnit antiEntropyTimeUnit, boolean convergeFaster, - boolean persistent) { + boolean persistent, + PersistenceService persistenceService) { this.mapName = mapName; - items = Maps.newConcurrentMap(); + this.serializer = createSerializer(serializerBuilder); + this.persistenceService = persistenceService; + this.persistent = + persistent; + if (persistent) { + items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder() + .withName(PERSISTENT_LOCAL_MAP_NAME) + .withSerializer(new Serializer() { + + @Override + public <T> byte[] encode(T object) { + return EventuallyConsistentMapImpl.this.serializer.encode(object); + } + + @Override + public <T> T decode(byte[] bytes) { + return EventuallyConsistentMapImpl.this.serializer.decode(bytes); + } + }) + .build(); + } else { + items = Maps.newConcurrentMap(); + } senderPending = Maps.newConcurrentMap(); destroyedMessage = mapName + ERROR_DESTROYED; @@ -168,8 +196,6 @@ public class EventuallyConsistentMapImpl<K, V> this.clusterCommunicator = clusterCommunicator; this.localNodeId = clusterService.getLocalNode().id(); - this.serializer = createSerializer(serializerBuilder); - this.timestampProvider = timestampProvider; if (peerUpdateFunction != null) { @@ -198,20 +224,6 @@ public class EventuallyConsistentMapImpl<K, V> newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d")); } - this.persistent = persistent; - - if (this.persistent) { - String dataDirectory = System.getProperty("karaf.data", "./data"); - String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName; - - ExecutorService dbExecutor = - newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter")); - - persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer); - persistentStore.readInto(items); - } else { - this.persistentStore = null; - } if (backgroundExecutor != null) { this.backgroundExecutor = backgroundExecutor; @@ -373,15 +385,6 @@ public class EventuallyConsistentMapImpl<K, V> return existing; } }); - if (updated.get()) { - if (persistent) { - if (tombstone.isPresent()) { - persistentStore.update(key, tombstone.get()); - } else { - persistentStore.remove(key); - } - } - } return previousValue.get(); } @@ -455,6 +458,7 @@ public class EventuallyConsistentMapImpl<K, V> /** * Returns true if newValue was accepted i.e. map is updated. + * * @param key key * @param newValue proposed new value * @return true if update happened; false if map already contains a more recent value for the key @@ -473,9 +477,6 @@ public class EventuallyConsistentMapImpl<K, V> } return existing; }); - if (updated.get() && persistent) { - persistentStore.update(key, newValue); - } return updated.get(); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java index 8cd63e7d..1695e5ff 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java @@ -59,6 +59,7 @@ import org.onosproject.net.flow.FlowRuleStore; import org.onosproject.net.flow.FlowRuleStoreDelegate; import org.onosproject.net.flow.StoredFlowEntry; import org.onosproject.net.flow.TableStatisticsEntry; +import org.onosproject.persistence.PersistenceService; import org.onosproject.store.AbstractStore; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.ClusterMessage; @@ -74,6 +75,7 @@ import org.onosproject.store.serializers.custom.DistributedStoreSerializers; import org.onosproject.store.service.EventuallyConsistentMap; import org.onosproject.store.service.EventuallyConsistentMapEvent; import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.WallClockTimestamp; import org.osgi.service.component.ComponentContext; @@ -113,6 +115,7 @@ public class NewDistributedFlowRuleStore private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8; private static final boolean DEFAULT_BACKUP_ENABLED = true; + private static final boolean DEFAULT_PERSISTENCE_ENABLED = false; private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000; private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000; // number of devices whose flow entries will be backed up in one communication round @@ -129,6 +132,9 @@ public class NewDistributedFlowRuleStore @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS, label = "Delay in ms between successive backup runs") private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS; + @Property(name = "persistenceEnabled", boolValue = false, + label = "Indicates whether or not changes in the flow table should be persisted to disk.") + private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED; private InternalFlowTable flowTable = new InternalFlowTable(); @@ -153,6 +159,9 @@ public class NewDistributedFlowRuleStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected MastershipService mastershipService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected PersistenceService persistenceService; + private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap(); private ExecutorService messageHandlingExecutor; @@ -716,7 +725,25 @@ public class NewDistributedFlowRuleStore * @return Map representing Flow Table of given device. */ private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) { - return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()); + if (persistenceEnabled) { + return flowEntries.computeIfAbsent(deviceId, id -> persistenceService + .<FlowId, Set<StoredFlowEntry>>persistentMapBuilder() + .withName("FlowTable:" + deviceId.toString()) + .withSerializer(new Serializer() { + @Override + public <T> byte[] encode(T object) { + return SERIALIZER.encode(object); + } + + @Override + public <T> T decode(byte[] bytes) { + return SERIALIZER.decode(bytes); + } + }) + .build()); + } else { + return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()); + } } private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) { diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java index f9c96891..20124576 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java @@ -19,8 +19,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.onosproject.net.DefaultAnnotations.merge; import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED; -import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED; +import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED; import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; @@ -28,9 +28,10 @@ import static org.slf4j.LoggerFactory.getLogger; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -56,7 +57,6 @@ import org.onosproject.net.host.HostDescription; import org.onosproject.net.host.HostEvent; import org.onosproject.net.host.HostStore; import org.onosproject.net.host.HostStoreDelegate; -import org.onosproject.net.host.HostEvent.Type; import org.onosproject.net.provider.ProviderId; import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; @@ -67,10 +67,7 @@ import org.onosproject.store.service.LogicalClockService; import org.onosproject.store.service.StorageService; import org.slf4j.Logger; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimaps; -import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; /** @@ -90,13 +87,11 @@ public class ECHostStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected LogicalClockService clockService; - // Hosts tracked by their location - private final SetMultimap<ConnectPoint, Host> locations = - Multimaps.synchronizedSetMultimap( - HashMultimap.<ConnectPoint, Host>create()); - private EventuallyConsistentMap<HostId, DefaultHost> hosts; + private final ConcurrentHashMap<HostId, DefaultHost> prevHosts = + new ConcurrentHashMap<>(); + private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker = new HostLocationTracker(); @@ -120,11 +115,12 @@ public class ECHostStore public void deactivate() { hosts.removeListener(hostLocationTracker); hosts.destroy(); - locations.clear(); + prevHosts.clear(); log.info("Stopped"); } + // TODO No longer need to return HostEvent @Override public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId, @@ -133,18 +129,7 @@ public class ECHostStore // TODO: We need a way to detect conflicting changes and abort update. // (BOC) Compute might do this for us. - final AtomicReference<Type> eventType = new AtomicReference<>(); - final AtomicReference<DefaultHost> oldHost = new AtomicReference<>(); - DefaultHost host = hosts.compute(hostId, (id, existingHost) -> { - if (existingHost != null) { - oldHost.set(existingHost); - checkState(Objects.equals(hostDescription.hwAddress(), existingHost.mac()), - "Existing and new MAC addresses differ."); - checkState(Objects.equals(hostDescription.vlan(), existingHost.vlan()), - "Existing and new VLANs differ."); - } - - // TODO do we ever want the existing location? + hosts.compute(hostId, (id, existingHost) -> { HostLocation location = hostDescription.location(); final Set<IpAddress> addresses; @@ -163,15 +148,6 @@ public class ECHostStore annotations = hostDescription.annotations(); } - if (existingHost == null) { - eventType.set(HOST_ADDED); - } else if (!Objects.equals(existingHost.location(), hostDescription.location())) { - eventType.set(HOST_MOVED); - } else if (!existingHost.ipAddresses().containsAll(hostDescription.ipAddress()) || - !hostDescription.annotations().keys().isEmpty()) { - eventType.set(HOST_UPDATED); - } // else, eventType == null; this means we don't send an event - return new DefaultHost(providerId, hostId, hostDescription.hwAddress(), @@ -181,24 +157,20 @@ public class ECHostStore annotations); }); - if (oldHost.get() != null) { - DefaultHost old = oldHost.get(); - locations.remove(old.location(), old); - } - locations.put(host.location(), host); - - return eventType.get() != null ? new HostEvent(eventType.get(), host) : null; + return null; } + // TODO No longer need to return HostEvent @Override public HostEvent removeHost(HostId hostId) { - Host host = hosts.remove(hostId); - return host != null ? new HostEvent(HOST_REMOVED, host) : null; + hosts.remove(hostId); + return null; } + // TODO No longer need to return HostEvent @Override public HostEvent removeIp(HostId hostId, IpAddress ipAddress) { - DefaultHost host = hosts.compute(hostId, (id, existingHost) -> { + hosts.compute(hostId, (id, existingHost) -> { if (existingHost != null) { checkState(Objects.equals(hostId.mac(), existingHost.mac()), "Existing and new MAC addresses differ."); @@ -222,7 +194,7 @@ public class ECHostStore } return null; }); - return host != null ? new HostEvent(HOST_UPDATED, host) : null; + return null; } @Override @@ -257,22 +229,19 @@ public class ECHostStore @Override public Set<Host> getConnectedHosts(ConnectPoint connectPoint) { - synchronized (locations) { - return ImmutableSet.copyOf(locations.get(connectPoint)); - } + Set<Host> filtered = hosts.entrySet().stream() + .filter(entry -> entry.getValue().location().equals(connectPoint)) + .map(Map.Entry::getValue) + .collect(Collectors.toSet()); + return ImmutableSet.copyOf(filtered); } @Override public Set<Host> getConnectedHosts(DeviceId deviceId) { - Set<Host> filtered; - synchronized (locations) { - filtered = locations - .entries() - .stream() - .filter(entry -> entry.getKey().deviceId().equals(deviceId)) - .map(entry -> entry.getValue()) - .collect(Collectors.toSet()); - } + Set<Host> filtered = hosts.entrySet().stream() + .filter(entry -> entry.getValue().location().deviceId().equals(deviceId)) + .map(Map.Entry::getValue) + .collect(Collectors.toSet()); return ImmutableSet.copyOf(filtered); } @@ -285,13 +254,18 @@ public class ECHostStore public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) { DefaultHost host = checkNotNull(event.value()); if (event.type() == PUT) { - boolean isNew = locations.put(host.location(), host); - notifyDelegate(new HostEvent(isNew ? HOST_ADDED : HOST_UPDATED, host)); + Host prevHost = prevHosts.put(host.id(), host); + if (prevHost == null) { + notifyDelegate(new HostEvent(HOST_ADDED, host)); + } else if (!Objects.equals(prevHost.location(), host.location())) { + notifyDelegate(new HostEvent(HOST_MOVED, host, prevHost)); + } else if (!Objects.equals(prevHost, host)) { + notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost)); + } } else if (event.type() == REMOVE) { - if (locations.remove(host.location(), host)) { + if (prevHosts.remove(host.id()) != null) { notifyDelegate(new HostEvent(HOST_REMOVED, host)); } - } } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java index 9382960f..dfee9980 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/impl/LogicalTimestamp.java @@ -50,7 +50,7 @@ public class LogicalTimestamp implements Timestamp { @Override public int hashCode() { - return Objects.hash(value); + return Long.hashCode(value); } @Override diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java index 885361f0..f6cd198f 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionId.java @@ -56,7 +56,7 @@ public class PartitionId { @Override public int hashCode() { - return Objects.hash(id); + return id; } @Override diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java index 687576c3..0335ba5d 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java @@ -16,20 +16,23 @@ package org.onosproject.store.newresource.impl; import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableList; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onosproject.net.newresource.ResourceConsumer; +import org.onosproject.net.newresource.ResourceEvent; import org.onosproject.net.newresource.ResourcePath; import org.onosproject.net.newresource.ResourceStore; +import org.onosproject.net.newresource.ResourceStoreDelegate; +import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionException; import org.onosproject.store.service.TransactionalMap; import org.onosproject.store.service.Versioned; import org.slf4j.Logger; @@ -48,6 +51,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.onosproject.net.newresource.ResourceEvent.Type.*; /** * Implementation of ResourceStore using TransactionalMap. @@ -55,7 +59,8 @@ import static com.google.common.base.Preconditions.checkNotNull; @Component(immediate = true) @Service @Beta -public class ConsistentResourceStore implements ResourceStore { +public class ConsistentResourceStore extends AbstractStore<ResourceEvent, ResourceStoreDelegate> + implements ResourceStore { private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class); private static final String CONSUMER_MAP = "onos-resource-consumers"; @@ -79,6 +84,8 @@ public class ConsistentResourceStore implements ResourceStore { .withName(CHILD_MAP) .withSerializer(SERIALIZER) .build(); + + childMap.put(ResourcePath.ROOT, ImmutableList.of()); } @Override @@ -100,29 +107,32 @@ public class ConsistentResourceStore implements ResourceStore { TransactionContext tx = service.transactionContextBuilder().build(); tx.begin(); - try { - TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = - tx.getTransactionalMap(CHILD_MAP, SERIALIZER); + TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = + tx.getTransactionalMap(CHILD_MAP, SERIALIZER); - Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream() - .filter(x -> x.parent().isPresent()) - .collect(Collectors.groupingBy(x -> x.parent().get())); + Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream() + .filter(x -> x.parent().isPresent()) + .collect(Collectors.groupingBy(x -> x.parent().get())); - for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) { - if (!isRegistered(childTxMap, entry.getKey())) { - return abortTransaction(tx); - } + for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) { + if (!isRegistered(childTxMap, entry.getKey())) { + return abortTransaction(tx); + } - if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) { - return abortTransaction(tx); - } + if (!appendValues(childTxMap, entry.getKey(), entry.getValue())) { + return abortTransaction(tx); } + } - return commitTransaction(tx); - } catch (TransactionException e) { - log.error("Exception thrown, abort the transaction", e); - return abortTransaction(tx); + boolean success = tx.commit(); + if (success) { + List<ResourceEvent> events = resources.stream() + .filter(x -> x.parent().isPresent()) + .map(x -> new ResourceEvent(RESOURCE_ADDED, x)) + .collect(Collectors.toList()); + notifyDelegate(events); } + return success; } @Override @@ -132,33 +142,36 @@ public class ConsistentResourceStore implements ResourceStore { TransactionContext tx = service.transactionContextBuilder().build(); tx.begin(); - try { - TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = - tx.getTransactionalMap(CHILD_MAP, SERIALIZER); - TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = - tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); + TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = + tx.getTransactionalMap(CHILD_MAP, SERIALIZER); + TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = + tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); - Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream() - .filter(x -> x.parent().isPresent()) - .collect(Collectors.groupingBy(x -> x.parent().get())); - - // even if one of the resources is allocated to a consumer, - // all unregistrations are regarded as failure - for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) { - if (entry.getValue().stream().anyMatch(x -> consumerTxMap.get(x) != null)) { - return abortTransaction(tx); - } - - if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) { - return abortTransaction(tx); - } + Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream() + .filter(x -> x.parent().isPresent()) + .collect(Collectors.groupingBy(x -> x.parent().get())); + + // even if one of the resources is allocated to a consumer, + // all unregistrations are regarded as failure + for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) { + if (entry.getValue().stream().anyMatch(x -> consumerTxMap.get(x) != null)) { + return abortTransaction(tx); } - return commitTransaction(tx); - } catch (TransactionException e) { - log.error("Exception thrown, abort the transaction", e); - return abortTransaction(tx); + if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) { + return abortTransaction(tx); + } + } + + boolean success = tx.commit(); + if (success) { + List<ResourceEvent> events = resources.stream() + .filter(x -> x.parent().isPresent()) + .map(x -> new ResourceEvent(RESOURCE_REMOVED, x)) + .collect(Collectors.toList()); + notifyDelegate(events); } + return success; } @Override @@ -169,28 +182,23 @@ public class ConsistentResourceStore implements ResourceStore { TransactionContext tx = service.transactionContextBuilder().build(); tx.begin(); - try { - TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = - tx.getTransactionalMap(CHILD_MAP, SERIALIZER); - TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = - tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); - - for (ResourcePath resource: resources) { - if (!isRegistered(childTxMap, resource)) { - return abortTransaction(tx); - } - - ResourceConsumer oldValue = consumerTxMap.put(resource, consumer); - if (oldValue != null) { - return abortTransaction(tx); - } + TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap = + tx.getTransactionalMap(CHILD_MAP, SERIALIZER); + TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = + tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); + + for (ResourcePath resource: resources) { + if (!isRegistered(childTxMap, resource)) { + return abortTransaction(tx); } - return commitTransaction(tx); - } catch (TransactionException e) { - log.error("Exception thrown, abort the transaction", e); - return abortTransaction(tx); + ResourceConsumer oldValue = consumerTxMap.put(resource, consumer); + if (oldValue != null) { + return abortTransaction(tx); + } } + + return tx.commit(); } @Override @@ -202,28 +210,23 @@ public class ConsistentResourceStore implements ResourceStore { TransactionContext tx = service.transactionContextBuilder().build(); tx.begin(); - try { - TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = - tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); - Iterator<ResourcePath> resourceIte = resources.iterator(); - Iterator<ResourceConsumer> consumerIte = consumers.iterator(); - - while (resourceIte.hasNext() && consumerIte.hasNext()) { - ResourcePath resource = resourceIte.next(); - ResourceConsumer consumer = consumerIte.next(); - - // if this single release fails (because the resource is allocated to another consumer, - // the whole release fails - if (!consumerTxMap.remove(resource, consumer)) { - return abortTransaction(tx); - } - } + TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap = + tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER); + Iterator<ResourcePath> resourceIte = resources.iterator(); + Iterator<ResourceConsumer> consumerIte = consumers.iterator(); + + while (resourceIte.hasNext() && consumerIte.hasNext()) { + ResourcePath resource = resourceIte.next(); + ResourceConsumer consumer = consumerIte.next(); - return commitTransaction(tx); - } catch (TransactionException e) { - log.error("Exception thrown, abort the transaction", e); - return abortTransaction(tx); + // if this single release fails (because the resource is allocated to another consumer, + // the whole release fails + if (!consumerTxMap.remove(resource, consumer)) { + return abortTransaction(tx); + } } + + return tx.commit(); } @Override @@ -261,7 +264,7 @@ public class ConsistentResourceStore implements ResourceStore { } return children.value().stream() - .filter(x -> x.lastComponent().getClass().equals(cls)) + .filter(x -> x.last().getClass().equals(cls)) .filter(consumerMap::containsKey) .collect(Collectors.toList()); } @@ -278,17 +281,6 @@ public class ConsistentResourceStore implements ResourceStore { } /** - * Commit the transaction. - * - * @param tx transaction context - * @return always true - */ - private boolean commitTransaction(TransactionContext tx) { - tx.commit(); - return true; - } - - /** * Appends the values to the existing values associated with the specified key. * If the map already has all the given values, appending will not happen. * @@ -300,9 +292,9 @@ public class ConsistentResourceStore implements ResourceStore { * @return true if the operation succeeds, false otherwise. */ private <K, V> boolean appendValues(TransactionalMap<K, List<V>> map, K key, List<V> values) { - List<V> oldValues = map.get(key); + List<V> oldValues = map.putIfAbsent(key, new ArrayList<>(values)); if (oldValues == null) { - return map.replace(key, oldValues, new ArrayList<>(values)); + return true; } LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues); @@ -329,7 +321,8 @@ public class ConsistentResourceStore implements ResourceStore { private <K, V> boolean removeValues(TransactionalMap<K, List<V>> map, K key, List<V> values) { List<V> oldValues = map.get(key); if (oldValues == null) { - return map.replace(key, oldValues, new ArrayList<>()); + map.put(key, new ArrayList<>()); + return true; } LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues); @@ -351,7 +344,7 @@ public class ConsistentResourceStore implements ResourceStore { */ private boolean isRegistered(TransactionalMap<ResourcePath, List<ResourcePath>> map, ResourcePath resource) { // root is always regarded to be registered - if (resource.isRoot()) { + if (!resource.parent().isPresent()) { return true; } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java index c332ada5..351c7a5f 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentLinkResourceStore.java @@ -59,7 +59,6 @@ import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionException; import org.onosproject.store.service.TransactionalMap; import org.onosproject.store.service.Versioned; @@ -73,7 +72,10 @@ import static org.onosproject.net.AnnotationKeys.BANDWIDTH; /** * Store that manages link resources using Copycat-backed TransactionalMaps. + * + * @deprecated in Emu Release */ +@Deprecated @Component(immediate = true, enabled = true) @Service public class ConsistentLinkResourceStore extends @@ -294,7 +296,7 @@ public class ConsistentLinkResourceStore extends intentAllocs.put(allocations.intentId(), allocations); allocations.links().forEach(link -> allocateLinkResource(tx, link, allocations)); tx.commit(); - } catch (TransactionException | ResourceAllocationException e) { + } catch (ResourceAllocationException e) { log.error("Exception thrown, rolling back", e); tx.abort(); } catch (Exception e) { @@ -407,12 +409,8 @@ public class ConsistentLinkResourceStore extends after.remove(allocations); linkAllocs.replace(linkId, before, after); }); - tx.commit(); - success = true; - } catch (TransactionException e) { - log.debug("Transaction failed, retrying", e); - tx.abort(); - } catch (Exception e) { + success = tx.commit(); + } catch (Exception e) { log.error("Exception thrown during releaseResource {}", allocations, e); tx.abort(); throw e; diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java index 3a168936..5cbf360a 100644 --- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java @@ -232,7 +232,7 @@ public class GossipDeviceStoreTest { if (expected == actual) { return; } - assertEquals(expected.deviceURI(), actual.deviceURI()); + assertEquals(expected.deviceUri(), actual.deviceUri()); assertEquals(expected.hwVersion(), actual.hwVersion()); assertEquals(expected.manufacturer(), actual.manufacturer()); assertEquals(expected.serialNumber(), actual.serialNumber()); @@ -247,7 +247,7 @@ public class GossipDeviceStoreTest { if (expected == actual) { return; } - assertEquals(expected.deviceURI(), actual.deviceURI()); + assertEquals(expected.deviceUri(), actual.deviceUri()); assertEquals(expected.hwVersion(), actual.hwVersion()); assertEquals(expected.manufacturer(), actual.manufacturer()); assertEquals(expected.serialNumber(), actual.serialNumber()); diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java index ccf6ee71..ef8d9924 100644 --- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java @@ -42,6 +42,7 @@ import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.DefaultControllerNode; import org.onosproject.cluster.NodeId; import org.onosproject.event.AbstractEvent; +import org.onosproject.persistence.impl.PersistenceManager; import org.onosproject.store.Timestamp; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter; @@ -81,6 +82,7 @@ public class EventuallyConsistentMapImplTest { private EventuallyConsistentMap<String, String> ecMap; + private PersistenceManager persistenceService; private ClusterService clusterService; private ClusterCommunicationService clusterCommunicator; private SequentialClockService<String, String> clockService; @@ -136,6 +138,8 @@ public class EventuallyConsistentMapImplTest { clusterCommunicator = createMock(ClusterCommunicationService.class); + persistenceService = new PersistenceManager(); + persistenceService.activate(); // Add expectation for adding cluster message subscribers which // delegate to our ClusterCommunicationService implementation. This // allows us to get a reference to the map's internal cluster message @@ -153,11 +157,12 @@ public class EventuallyConsistentMapImplTest { .register(TestTimestamp.class); ecMap = new EventuallyConsistentMapBuilderImpl<String, String>( - clusterService, clusterCommunicator) + clusterService, clusterCommunicator, persistenceService) .withName(MAP_NAME) .withSerializer(serializer) .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v)) .withCommunicationExecutor(MoreExecutors.newDirectExecutorService()) + .withPersistence() .build(); // Reset ready for tests to add their own expectations |