/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.store.config.impl; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.DoubleNode; import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.LongNode; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ShortNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.util.KryoNamespace; import org.onlab.util.Tools; import org.onosproject.net.config.Config; import org.onosproject.net.config.ConfigApplyDelegate; import org.onosproject.net.config.ConfigFactory; import org.onosproject.net.config.NetworkConfigEvent; import org.onosproject.net.config.NetworkConfigStore; import org.onosproject.net.config.NetworkConfigStoreDelegate; import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.ConsistentMapException; import org.onosproject.store.service.MapEvent; import org.onosproject.store.service.MapEventListener; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; import static org.onosproject.net.config.NetworkConfigEvent.Type.*; /** * Implementation of a distributed network configuration store. */ @Component(immediate = true) @Service public class DistributedNetworkConfigStore extends AbstractStore implements NetworkConfigStore { private final Logger log = LoggerFactory.getLogger(getClass()); private static final int MAX_BACKOFF = 10; private static final String INVALID_CONFIG_JSON = "JSON node does not contain valid configuration"; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected StorageService storageService; private ConsistentMap configs; private final Map factoriesByConfig = Maps.newConcurrentMap(); private final ObjectMapper mapper = new ObjectMapper(); private final ConfigApplyDelegate applyDelegate = new InternalApplyDelegate(); private final MapEventListener listener = new InternalMapListener(); @Activate public void activate() { KryoNamespace.Builder kryoBuilder = new KryoNamespace.Builder() .register(KryoNamespaces.API) .register(ConfigKey.class, ObjectNode.class, ArrayNode.class, JsonNodeFactory.class, LinkedHashMap.class, TextNode.class, BooleanNode.class, LongNode.class, DoubleNode.class, ShortNode.class, IntNode.class, NullNode.class); configs = storageService.consistentMapBuilder() .withSerializer(Serializer.using(kryoBuilder.build())) .withName("onos-network-configs") .withRelaxedReadConsistency() .build(); configs.addListener(listener); log.info("Started"); } @Deactivate public void deactivate() { configs.removeListener(listener); log.info("Stopped"); } @Override public void addConfigFactory(ConfigFactory configFactory) { factoriesByConfig.put(configFactory.configClass().getName(), configFactory); notifyDelegate(new NetworkConfigEvent(CONFIG_REGISTERED, configFactory.configKey(), configFactory.configClass())); } @Override public void removeConfigFactory(ConfigFactory configFactory) { factoriesByConfig.remove(configFactory.configClass().getName()); notifyDelegate(new NetworkConfigEvent(CONFIG_UNREGISTERED, configFactory.configKey(), configFactory.configClass())); } @Override @SuppressWarnings("unchecked") public > ConfigFactory getConfigFactory(Class configClass) { return (ConfigFactory) factoriesByConfig.get(configClass.getName()); } @Override @SuppressWarnings("unchecked") public Set getSubjects(Class subjectClass) { ImmutableSet.Builder builder = ImmutableSet.builder(); configs.keySet().forEach(k -> { if (subjectClass.isInstance(k.subject)) { builder.add((S) k.subject); } }); return builder.build(); } @Override @SuppressWarnings("unchecked") public > Set getSubjects(Class subjectClass, Class configClass) { ImmutableSet.Builder builder = ImmutableSet.builder(); String cName = configClass.getName(); configs.keySet().forEach(k -> { if (subjectClass.isInstance(k.subject) && cName.equals(k.configClass)) { builder.add((S) k.subject); } }); return builder.build(); } @Override @SuppressWarnings("unchecked") public Set>> getConfigClasses(S subject) { ImmutableSet.Builder>> builder = ImmutableSet.builder(); configs.keySet().forEach(k -> { if (Objects.equals(subject, k.subject) && delegate != null) { builder.add(factoriesByConfig.get(k.configClass).configClass()); } }); return builder.build(); } @Override public > T getConfig(S subject, Class configClass) { // TODO: need to identify and address the root cause for timeouts. Versioned json = Tools.retryable(configs::get, ConsistentMapException.class, 1, MAX_BACKOFF) .apply(key(subject, configClass)); return json != null ? createConfig(subject, configClass, json.value()) : null; } @Override public > C createConfig(S subject, Class configClass) { ConfigFactory factory = getConfigFactory(configClass); Versioned json = configs.computeIfAbsent(key(subject, configClass), k -> factory.isList() ? mapper.createArrayNode() : mapper.createObjectNode()); return createConfig(subject, configClass, json.value()); } @Override public > C applyConfig(S subject, Class configClass, JsonNode json) { // Create the configuration and validate it. C config = createConfig(subject, configClass, json); checkArgument(config.isValid(), INVALID_CONFIG_JSON); // Insert the validated configuration and get it back. Versioned versioned = configs.putAndGet(key(subject, configClass), json); // Re-create the config if for some reason what we attempted to put // was supplanted by someone else already. return versioned.value() == json ? config : createConfig(subject, configClass, versioned.value()); } @Override public > void clearConfig(S subject, Class configClass) { configs.remove(key(subject, configClass)); } /** * Produces a config from the specified subject, config class and raw JSON. * * @param subject config subject * @param configClass config class * @param json raw JSON data * @return config object or null of no factory found or if the specified * JSON is null */ @SuppressWarnings("unchecked") private > C createConfig(S subject, Class configClass, JsonNode json) { if (json != null) { ConfigFactory factory = factoriesByConfig.get(configClass.getName()); if (factory != null) { C config = factory.createConfig(); config.init(subject, factory.configKey(), json, mapper, applyDelegate); return config; } } return null; } // Auxiliary delegate to receive notifications about changes applied to // the network configuration - by the apps. private class InternalApplyDelegate implements ConfigApplyDelegate { @Override public void onApply(Config config) { configs.put(key(config.subject(), config.getClass()), config.node()); } } // Produces a key for uniquely tracking a subject config. private static ConfigKey key(Object subject, Class configClass) { return new ConfigKey(subject, configClass); } // Auxiliary key to track subject configurations. private static final class ConfigKey { final Object subject; final String configClass; private ConfigKey(Object subject, Class configClass) { this.subject = subject; this.configClass = configClass.getName(); } @Override public int hashCode() { return Objects.hash(subject, configClass); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj instanceof ConfigKey) { final ConfigKey other = (ConfigKey) obj; return Objects.equals(this.subject, other.subject) && Objects.equals(this.configClass, other.configClass); } return false; } } private class InternalMapListener implements MapEventListener { @Override public void event(MapEvent event) { NetworkConfigEvent.Type type; switch (event.type()) { case INSERT: type = CONFIG_ADDED; break; case UPDATE: type = CONFIG_UPDATED; break; case REMOVE: default: type = CONFIG_REMOVED; break; } ConfigFactory factory = factoriesByConfig.get(event.key().configClass); if (factory != null) { notifyDelegate(new NetworkConfigEvent(type, event.key().subject, factory.configClass())); } } } }