diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java new file mode 100644 index 00000000..3e73d8f4 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java @@ -0,0 +1,289 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.config.impl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ShortNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onlab.util.Tools; +import org.onosproject.net.config.Config; +import org.onosproject.net.config.ConfigApplyDelegate; +import org.onosproject.net.config.ConfigFactory; +import org.onosproject.net.config.NetworkConfigEvent; +import org.onosproject.net.config.NetworkConfigStore; +import org.onosproject.net.config.NetworkConfigStoreDelegate; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.ConsistentMapException; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.onosproject.net.config.NetworkConfigEvent.Type.*; + +/** + * Implementation of a distributed network configuration store. + */ +@Component(immediate = true) +@Service +public class DistributedNetworkConfigStore + extends AbstractStore<NetworkConfigEvent, NetworkConfigStoreDelegate> + implements NetworkConfigStore { + + private static final int MAX_BACKOFF = 10; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + private ConsistentMap<ConfigKey, JsonNode> configs; + + private final Map<String, ConfigFactory> factoriesByConfig = Maps.newConcurrentMap(); + private final ObjectMapper mapper = new ObjectMapper(); + private final ConfigApplyDelegate applyDelegate = new InternalApplyDelegate(); + private final MapEventListener<ConfigKey, JsonNode> listener = new InternalMapListener(); + + @Activate + public void activate() { + KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder() + .register(KryoNamespaces.API) + .register(ConfigKey.class, ObjectNode.class, ArrayNode.class, + JsonNodeFactory.class, LinkedHashMap.class, + TextNode.class, BooleanNode.class, + LongNode.class, DoubleNode.class, ShortNode.class, IntNode.class); + + configs = storageService.<ConfigKey, JsonNode>consistentMapBuilder() + .withSerializer(Serializer.using(kryoBuilder.build())) + .withName("onos-network-configs") + .withRelaxedReadConsistency() + .build(); + configs.addListener(listener); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + configs.removeListener(listener); + log.info("Stopped"); + } + + @Override + public void addConfigFactory(ConfigFactory configFactory) { + factoriesByConfig.put(configFactory.configClass().getName(), configFactory); + notifyDelegate(new NetworkConfigEvent(CONFIG_REGISTERED, configFactory.configKey(), + configFactory.configClass())); + } + + @Override + public void removeConfigFactory(ConfigFactory configFactory) { + factoriesByConfig.remove(configFactory.configClass().getName()); + notifyDelegate(new NetworkConfigEvent(CONFIG_UNREGISTERED, configFactory.configKey(), + configFactory.configClass())); + } + + @Override + @SuppressWarnings("unchecked") + public <S, C extends Config<S>> ConfigFactory<S, C> getConfigFactory(Class<C> configClass) { + return (ConfigFactory<S, C>) factoriesByConfig.get(configClass.getName()); + } + + @Override + @SuppressWarnings("unchecked") + public <S> Set<S> getSubjects(Class<S> subjectClass) { + ImmutableSet.Builder<S> builder = ImmutableSet.builder(); + configs.keySet().forEach(k -> { + if (subjectClass.isInstance(k.subject)) { + builder.add((S) k.subject); + } + }); + return builder.build(); + } + + @Override + @SuppressWarnings("unchecked") + public <S, C extends Config<S>> Set<S> getSubjects(Class<S> subjectClass, Class<C> configClass) { + ImmutableSet.Builder<S> builder = ImmutableSet.builder(); + String cName = configClass.getName(); + configs.keySet().forEach(k -> { + if (subjectClass.isInstance(k.subject) && cName.equals(k.configClass)) { + builder.add((S) k.subject); + } + }); + return builder.build(); + } + + @Override + @SuppressWarnings("unchecked") + public <S> Set<Class<? extends Config<S>>> getConfigClasses(S subject) { + ImmutableSet.Builder<Class<? extends Config<S>>> builder = ImmutableSet.builder(); + configs.keySet().forEach(k -> { + if (Objects.equals(subject, k.subject) && delegate != null) { + builder.add(factoriesByConfig.get(k.configClass).configClass()); + } + }); + return builder.build(); + } + + @Override + public <S, T extends Config<S>> T getConfig(S subject, Class<T> configClass) { + // TODO: need to identify and address the root cause for timeouts. + Versioned<JsonNode> json = Tools.retryable(configs::get, ConsistentMapException.class, 1, MAX_BACKOFF) + .apply(key(subject, configClass)); + return json != null ? createConfig(subject, configClass, json.value()) : null; + } + + + @Override + public <S, C extends Config<S>> C createConfig(S subject, Class<C> configClass) { + ConfigFactory<S, C> factory = getConfigFactory(configClass); + Versioned<JsonNode> json = configs.computeIfAbsent(key(subject, configClass), + k -> factory.isList() ? + mapper.createArrayNode() : + mapper.createObjectNode()); + return createConfig(subject, configClass, json.value()); + } + + @Override + public <S, C extends Config<S>> C applyConfig(S subject, Class<C> configClass, JsonNode json) { + return createConfig(subject, configClass, + configs.putAndGet(key(subject, configClass), json).value()); + } + + @Override + public <S, C extends Config<S>> void clearConfig(S subject, Class<C> configClass) { + configs.remove(key(subject, configClass)); + } + + /** + * Produces a config from the specified subject, config class and raw JSON. + * + * @param subject config subject + * @param configClass config class + * @param json raw JSON data + * @return config object or null of no factory found or if the specified + * JSON is null + */ + @SuppressWarnings("unchecked") + private <S, C extends Config<S>> C createConfig(S subject, Class<C> configClass, + JsonNode json) { + if (json != null) { + ConfigFactory<S, C> factory = factoriesByConfig.get(configClass.getName()); + if (factory != null) { + C config = factory.createConfig(); + config.init(subject, factory.configKey(), json, mapper, applyDelegate); + return config; + } + } + return null; + } + + + // Auxiliary delegate to receive notifications about changes applied to + // the network configuration - by the apps. + private class InternalApplyDelegate implements ConfigApplyDelegate { + @Override + public void onApply(Config config) { + configs.put(key(config.subject(), config.getClass()), config.node()); + } + } + + // Produces a key for uniquely tracking a subject config. + private static ConfigKey key(Object subject, Class<?> configClass) { + return new ConfigKey(subject, configClass); + } + + // Auxiliary key to track subject configurations. + private static final class ConfigKey { + final Object subject; + final String configClass; + + private ConfigKey(Object subject, Class<?> configClass) { + this.subject = subject; + this.configClass = configClass.getName(); + } + + @Override + public int hashCode() { + return Objects.hash(subject, configClass); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof ConfigKey) { + final ConfigKey other = (ConfigKey) obj; + return Objects.equals(this.subject, other.subject) + && Objects.equals(this.configClass, other.configClass); + } + return false; + } + } + + private class InternalMapListener implements MapEventListener<ConfigKey, JsonNode> { + @Override + public void event(MapEvent<ConfigKey, JsonNode> event) { + NetworkConfigEvent.Type type; + switch (event.type()) { + case INSERT: + type = CONFIG_ADDED; + break; + case UPDATE: + type = CONFIG_UPDATED; + break; + case REMOVE: + default: + type = CONFIG_REMOVED; + break; + } + ConfigFactory factory = factoriesByConfig.get(event.key().configClass); + if (factory != null) { + notifyDelegate(new NetworkConfigEvent(type, event.key().subject, + factory.configClass())); + } + } + } +} |