diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap')
8 files changed, 0 insertions, 1327 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java deleted file mode 100644 index d783fe22..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/AntiEntropyAdvertisement.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.ecmap; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableMap; - -import org.onosproject.cluster.NodeId; - -import java.util.Map; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Anti-entropy advertisement message for eventually consistent map. - */ -public class AntiEntropyAdvertisement<K> { - - private final NodeId sender; - private final Map<K, MapValue.Digest> digest; - - /** - * Creates a new anti entropy advertisement message. - * - * @param sender the sender's node ID - * @param digest for map entries - */ - public AntiEntropyAdvertisement(NodeId sender, - Map<K, MapValue.Digest> digest) { - this.sender = checkNotNull(sender); - this.digest = ImmutableMap.copyOf(checkNotNull(digest)); - } - - /** - * Returns the sender's node ID. - * - * @return the sender's node ID - */ - public NodeId sender() { - return sender; - } - - /** - * Returns the digest for map entries. - * - * @return mapping from key to associated digest - */ - public Map<K, MapValue.Digest> digest() { - return digest; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("sender", sender) - .add("totalEntries", digest.size()) - .toString(); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java deleted file mode 100644 index eb98c829..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.ecmap; - -import org.onlab.util.KryoNamespace; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.NodeId; -import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.Timestamp; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapBuilder; - -import java.util.Collection; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Eventually consistent map builder. - */ -public class EventuallyConsistentMapBuilderImpl<K, V> - implements EventuallyConsistentMapBuilder<K, V> { - private final ClusterService clusterService; - private final ClusterCommunicationService clusterCommunicator; - - private String name; - private KryoNamespace.Builder serializerBuilder; - private ExecutorService eventExecutor; - private ExecutorService communicationExecutor; - private ScheduledExecutorService backgroundExecutor; - private BiFunction<K, V, Timestamp> timestampProvider; - private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction; - private boolean tombstonesDisabled = false; - private long antiEntropyPeriod = 5; - private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS; - private boolean convergeFaster = false; - private boolean persistent = false; - private boolean persistentMap = false; - private final PersistenceService persistenceService; - - /** - * Creates a new eventually consistent map builder. - * - * @param clusterService cluster service - * @param clusterCommunicator cluster communication service - */ - public EventuallyConsistentMapBuilderImpl(ClusterService clusterService, - ClusterCommunicationService clusterCommunicator, - PersistenceService persistenceService) { - this.persistenceService = persistenceService; - this.clusterService = checkNotNull(clusterService); - this.clusterCommunicator = checkNotNull(clusterCommunicator); - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withName(String name) { - this.name = checkNotNull(name); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withSerializer( - KryoNamespace.Builder serializerBuilder) { - this.serializerBuilder = checkNotNull(serializerBuilder); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withTimestampProvider( - BiFunction<K, V, Timestamp> timestampProvider) { - this.timestampProvider = checkNotNull(timestampProvider); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withEventExecutor(ExecutorService executor) { - this.eventExecutor = checkNotNull(executor); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor( - ExecutorService executor) { - communicationExecutor = checkNotNull(executor); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(ScheduledExecutorService executor) { - this.backgroundExecutor = checkNotNull(executor); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction( - BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) { - this.peerUpdateFunction = checkNotNull(peerUpdateFunction); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled() { - tombstonesDisabled = true; - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(long period, TimeUnit unit) { - checkArgument(period > 0, "anti-entropy period must be greater than 0"); - antiEntropyPeriod = period; - antiEntropyTimeUnit = checkNotNull(unit); - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withFasterConvergence() { - convergeFaster = true; - return this; - } - - @Override - public EventuallyConsistentMapBuilder<K, V> withPersistence() { - checkNotNull(this.persistenceService); - persistent = true; - return this; - } - - @Override - public EventuallyConsistentMap<K, V> build() { - checkNotNull(name, "name is a mandatory parameter"); - checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter"); - checkNotNull(timestampProvider, "timestampProvider is a mandatory parameter"); - - return new EventuallyConsistentMapImpl<>(name, - clusterService, - clusterCommunicator, - serializerBuilder, - timestampProvider, - peerUpdateFunction, - eventExecutor, - communicationExecutor, - backgroundExecutor, - tombstonesDisabled, - antiEntropyPeriod, - antiEntropyTimeUnit, - convergeFaster, - persistent, - persistenceService); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java deleted file mode 100644 index b5ea52e0..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java +++ /dev/null @@ -1,679 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.ecmap; - -import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.tuple.Pair; -import org.onlab.util.AbstractAccumulator; -import org.onlab.util.KryoNamespace; -import org.onlab.util.SlidingWindowCounter; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.Timestamp; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.impl.LogicalTimestamp; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.WallClockTimestamp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.Timer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; -import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; -import static org.onlab.util.Tools.groupedThreads; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; - -/** - * Distributed Map implementation which uses optimistic replication and gossip - * based techniques to provide an eventually consistent data store. - */ -public class EventuallyConsistentMapImpl<K, V> - implements EventuallyConsistentMap<K, V> { - - private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class); - - private final Map<K, MapValue<V>> items; - - private final ClusterService clusterService; - private final ClusterCommunicationService clusterCommunicator; - private final KryoSerializer serializer; - private final NodeId localNodeId; - private final PersistenceService persistenceService; - - private final BiFunction<K, V, Timestamp> timestampProvider; - - private final MessageSubject updateMessageSubject; - private final MessageSubject antiEntropyAdvertisementSubject; - - private final Set<EventuallyConsistentMapListener<K, V>> listeners - = Sets.newCopyOnWriteArraySet(); - - private final ExecutorService executor; - private final ScheduledExecutorService backgroundExecutor; - private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction; - - private final ExecutorService communicationExecutor; - private final Map<NodeId, EventAccumulator> senderPending; - - private final String mapName; - - private volatile boolean destroyed = false; - private static final String ERROR_DESTROYED = " map is already destroyed"; - private final String destroyedMessage; - - private static final String ERROR_NULL_KEY = "Key cannot be null"; - private static final String ERROR_NULL_VALUE = "Null values are not allowed"; - - private final long initialDelaySec = 5; - private final boolean lightweightAntiEntropy; - private final boolean tombstonesDisabled; - - private static final int WINDOW_SIZE = 5; - private static final int HIGH_LOAD_THRESHOLD = 0; - private static final int LOAD_WINDOW = 2; - private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE); - - private final boolean persistent; - - private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap"; - - - /** - * Creates a new eventually consistent map shared amongst multiple instances. - * <p> - * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder} - * for more description of the parameters expected by the map. - * </p> - * - * @param mapName a String identifier for the map. - * @param clusterService the cluster service - * @param clusterCommunicator the cluster communications service - * @param serializerBuilder a Kryo namespace builder that can serialize - * both K and V - * @param timestampProvider provider of timestamps for K and V - * @param peerUpdateFunction function that provides a set of nodes to immediately - * update to when there writes to the map - * @param eventExecutor executor to use for processing incoming - * events from peers - * @param communicationExecutor executor to use for sending events to peers - * @param backgroundExecutor executor to use for background anti-entropy - * tasks - * @param tombstonesDisabled true if this map should not maintain - * tombstones - * @param antiEntropyPeriod period that the anti-entropy task should run - * @param antiEntropyTimeUnit time unit for anti-entropy period - * @param convergeFaster make anti-entropy try to converge faster - * @param persistent persist data to disk - */ - EventuallyConsistentMapImpl(String mapName, - ClusterService clusterService, - ClusterCommunicationService clusterCommunicator, - KryoNamespace.Builder serializerBuilder, - BiFunction<K, V, Timestamp> timestampProvider, - BiFunction<K, V, Collection<NodeId>> peerUpdateFunction, - ExecutorService eventExecutor, - ExecutorService communicationExecutor, - ScheduledExecutorService backgroundExecutor, - boolean tombstonesDisabled, - long antiEntropyPeriod, - TimeUnit antiEntropyTimeUnit, - boolean convergeFaster, - boolean persistent, - PersistenceService persistenceService) { - this.mapName = mapName; - this.serializer = createSerializer(serializerBuilder); - this.persistenceService = persistenceService; - this.persistent = - persistent; - if (persistent) { - items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder() - .withName(PERSISTENT_LOCAL_MAP_NAME) - .withSerializer(new Serializer() { - - @Override - public <T> byte[] encode(T object) { - return EventuallyConsistentMapImpl.this.serializer.encode(object); - } - - @Override - public <T> T decode(byte[] bytes) { - return EventuallyConsistentMapImpl.this.serializer.decode(bytes); - } - }) - .build(); - } else { - items = Maps.newConcurrentMap(); - } - senderPending = Maps.newConcurrentMap(); - destroyedMessage = mapName + ERROR_DESTROYED; - - this.clusterService = clusterService; - this.clusterCommunicator = clusterCommunicator; - this.localNodeId = clusterService.getLocalNode().id(); - - this.timestampProvider = timestampProvider; - - if (peerUpdateFunction != null) { - this.peerUpdateFunction = peerUpdateFunction; - } else { - this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream() - .map(ControllerNode::id) - .filter(nodeId -> !nodeId.equals(localNodeId)) - .collect(Collectors.toList()); - } - - if (eventExecutor != null) { - this.executor = eventExecutor; - } else { - // should be a normal executor; it's used for receiving messages - this.executor = - Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d")); - } - - if (communicationExecutor != null) { - this.communicationExecutor = communicationExecutor; - } else { - // sending executor; should be capped - //TODO this probably doesn't need to be bounded anymore - this.communicationExecutor = - newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d")); - } - - - if (backgroundExecutor != null) { - this.backgroundExecutor = backgroundExecutor; - } else { - this.backgroundExecutor = - newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d")); - } - - // start anti-entropy thread - this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement, - initialDelaySec, antiEntropyPeriod, - antiEntropyTimeUnit); - - updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update"); - clusterCommunicator.addSubscriber(updateMessageSubject, - serializer::decode, - this::processUpdates, - this.executor); - - antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy"); - clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject, - serializer::decode, - this::handleAntiEntropyAdvertisement, - this.backgroundExecutor); - - this.tombstonesDisabled = tombstonesDisabled; - this.lightweightAntiEntropy = !convergeFaster; - } - - private KryoSerializer createSerializer(KryoNamespace.Builder builder) { - return new KryoSerializer() { - @Override - protected void setupKryoPool() { - // Add the map's internal helper classes to the user-supplied serializer - serializerPool = builder - .register(KryoNamespaces.BASIC) - .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID) - .register(LogicalTimestamp.class) - .register(WallClockTimestamp.class) - .register(AntiEntropyAdvertisement.class) - .register(UpdateEntry.class) - .register(MapValue.class) - .register(MapValue.Digest.class) - .build(); - } - }; - } - - @Override - public int size() { - checkState(!destroyed, destroyedMessage); - // TODO: Maintain a separate counter for tracking live elements in map. - return Maps.filterValues(items, MapValue::isAlive).size(); - } - - @Override - public boolean isEmpty() { - checkState(!destroyed, destroyedMessage); - return size() == 0; - } - - @Override - public boolean containsKey(K key) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - return get(key) != null; - } - - @Override - public boolean containsValue(V value) { - checkState(!destroyed, destroyedMessage); - checkNotNull(value, ERROR_NULL_VALUE); - return items.values() - .stream() - .filter(MapValue::isAlive) - .anyMatch(v -> value.equals(v.get())); - } - - @Override - public V get(K key) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - - MapValue<V> value = items.get(key); - return (value == null || value.isTombstone()) ? null : value.get(); - } - - @Override - public void put(K key, V value) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - - MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value)); - if (putInternal(key, newValue)) { - notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value)); - notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value)); - } - } - - @Override - public V remove(K key) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - return removeAndNotify(key, null); - } - - @Override - public void remove(K key, V value) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - removeAndNotify(key, value); - } - - private V removeAndNotify(K key, V value) { - Timestamp timestamp = timestampProvider.apply(key, value); - Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null - ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp)); - MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone); - if (previousValue != null) { - notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)), - peerUpdateFunction.apply(key, previousValue.get())); - if (previousValue.isAlive()) { - notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get())); - } - } - return previousValue != null ? previousValue.get() : null; - } - - private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - tombstone.ifPresent(v -> checkState(v.isTombstone())); - - counter.incrementCount(); - AtomicBoolean updated = new AtomicBoolean(false); - AtomicReference<MapValue<V>> previousValue = new AtomicReference<>(); - items.compute(key, (k, existing) -> { - boolean valueMatches = true; - if (value.isPresent() && existing != null && existing.isAlive()) { - valueMatches = Objects.equals(value.get(), existing.get()); - } - if (existing == null) { - log.trace("ECMap Remove: Existing value for key {} is already null", k); - } - if (valueMatches) { - if (existing == null) { - updated.set(tombstone.isPresent()); - } else { - updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing)); - } - } - if (updated.get()) { - previousValue.set(existing); - return tombstone.orElse(null); - } else { - return existing; - } - }); - return previousValue.get(); - } - - @Override - public V compute(K key, BiFunction<K, V, V> recomputeFunction) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(recomputeFunction, "Recompute function cannot be null"); - - AtomicBoolean updated = new AtomicBoolean(false); - AtomicReference<MapValue<V>> previousValue = new AtomicReference<>(); - MapValue<V> computedValue = items.compute(key, (k, mv) -> { - previousValue.set(mv); - V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get()); - MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue)); - if (mv == null || newValue.isNewerThan(mv)) { - updated.set(true); - return newValue; - } else { - return mv; - } - }); - if (updated.get()) { - notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get())); - EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT; - V value = computedValue.isTombstone() - ? previousValue.get() == null ? null : previousValue.get().get() - : computedValue.get(); - if (value != null) { - notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value)); - } - } - return computedValue.get(); - } - - @Override - public void putAll(Map<? extends K, ? extends V> m) { - checkState(!destroyed, destroyedMessage); - m.forEach(this::put); - } - - @Override - public void clear() { - checkState(!destroyed, destroyedMessage); - Maps.filterValues(items, MapValue::isAlive) - .forEach((k, v) -> remove(k)); - } - - @Override - public Set<K> keySet() { - checkState(!destroyed, destroyedMessage); - return Maps.filterValues(items, MapValue::isAlive) - .keySet(); - } - - @Override - public Collection<V> values() { - checkState(!destroyed, destroyedMessage); - return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get); - } - - @Override - public Set<Map.Entry<K, V>> entrySet() { - checkState(!destroyed, destroyedMessage); - return Maps.filterValues(items, MapValue::isAlive) - .entrySet() - .stream() - .map(e -> Pair.of(e.getKey(), e.getValue().get())) - .collect(Collectors.toSet()); - } - - /** - * Returns true if newValue was accepted i.e. map is updated. - * - * @param key key - * @param newValue proposed new value - * @return true if update happened; false if map already contains a more recent value for the key - */ - private boolean putInternal(K key, MapValue<V> newValue) { - checkState(!destroyed, destroyedMessage); - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(newValue, ERROR_NULL_VALUE); - checkState(newValue.isAlive()); - counter.incrementCount(); - AtomicBoolean updated = new AtomicBoolean(false); - items.compute(key, (k, existing) -> { - if (existing == null || newValue.isNewerThan(existing)) { - updated.set(true); - return newValue; - } - return existing; - }); - return updated.get(); - } - - @Override - public void addListener(EventuallyConsistentMapListener<K, V> listener) { - checkState(!destroyed, destroyedMessage); - - listeners.add(checkNotNull(listener)); - } - - @Override - public void removeListener(EventuallyConsistentMapListener<K, V> listener) { - checkState(!destroyed, destroyedMessage); - - listeners.remove(checkNotNull(listener)); - } - - @Override - public void destroy() { - destroyed = true; - - executor.shutdown(); - backgroundExecutor.shutdown(); - communicationExecutor.shutdown(); - - listeners.clear(); - - clusterCommunicator.removeSubscriber(updateMessageSubject); - clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject); - } - - private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) { - listeners.forEach(listener -> listener.event(event)); - } - - private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) { - queueUpdate(event, peers); - } - - private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) { - if (peers == null) { - // we have no friends :( - return; - } - peers.forEach(node -> - senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event) - ); - } - - private boolean underHighLoad() { - return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD; - } - - private void sendAdvertisement() { - try { - if (underHighLoad() || destroyed) { - return; - } - pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer); - } catch (Exception e) { - // Catch all exceptions to avoid scheduled task being suppressed. - log.error("Exception thrown while sending advertisement", e); - } - } - - private Optional<NodeId> pickRandomActivePeer() { - List<NodeId> activePeers = clusterService.getNodes() - .stream() - .map(ControllerNode::id) - .filter(id -> !localNodeId.equals(id)) - .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE) - .collect(Collectors.toList()); - Collections.shuffle(activePeers); - return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0)); - } - - private void sendAdvertisementToPeer(NodeId peer) { - clusterCommunicator.unicast(createAdvertisement(), - antiEntropyAdvertisementSubject, - serializer::encode, - peer) - .whenComplete((result, error) -> { - if (error != null) { - log.debug("Failed to send anti-entropy advertisement to {}", peer, error); - } - }); - } - - private AntiEntropyAdvertisement<K> createAdvertisement() { - return new AntiEntropyAdvertisement<K>(localNodeId, - ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest))); - } - - private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) { - if (destroyed || underHighLoad()) { - return; - } - try { - if (log.isTraceEnabled()) { - log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it", - mapName, ad.sender(), ad.digest().size()); - } - antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners); - - if (!lightweightAntiEntropy) { - // if remote ad has any entries that the local copy is missing, actively sync - // TODO: Missing keys is not the way local copy can be behind. - if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) { - // TODO: Send ad for missing keys and for entries that are stale - sendAdvertisementToPeer(ad.sender()); - } - } - } catch (Exception e) { - log.warn("Error handling anti-entropy advertisement", e); - } - } - - /** - * Processes anti-entropy ad from peer by taking following actions: - * 1. If peer has an old entry, updates peer. - * 2. If peer indicates an entry is removed and has a more recent - * timestamp than the local entry, update local state. - */ - private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems( - AntiEntropyAdvertisement<K> ad) { - final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList(); - final NodeId sender = ad.sender(); - items.forEach((key, localValue) -> { - MapValue.Digest remoteValueDigest = ad.digest().get(key); - if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) { - // local value is more recent, push to sender - queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender)); - } - if (remoteValueDigest != null - && remoteValueDigest.isNewerThan(localValue.digest()) - && remoteValueDigest.isTombstone()) { - MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp()); - MapValue<V> previousValue = removeInternal(key, - Optional.empty(), - Optional.of(tombstone)); - if (previousValue != null && previousValue.isAlive()) { - externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get())); - } - } - }); - return externalEvents; - } - - private void processUpdates(Collection<UpdateEntry<K, V>> updates) { - if (destroyed) { - return; - } - updates.forEach(update -> { - final K key = update.key(); - final MapValue<V> value = update.value(); - if (value == null || value.isTombstone()) { - MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value)); - if (previousValue != null && previousValue.isAlive()) { - notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get())); - } - } else if (putInternal(key, value)) { - notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get())); - } - }); - } - - // TODO pull this into the class if this gets pulled out... - private static final int DEFAULT_MAX_EVENTS = 1000; - private static final int DEFAULT_MAX_IDLE_MS = 10; - private static final int DEFAULT_MAX_BATCH_MS = 50; - private static final Timer TIMER = new Timer("onos-ecm-sender-events"); - - private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> { - - private final NodeId peer; - - private EventAccumulator(NodeId peer) { - super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS); - this.peer = peer; - } - - @Override - public void processItems(List<UpdateEntry<K, V>> items) { - Map<K, UpdateEntry<K, V>> map = Maps.newHashMap(); - items.forEach(item -> map.compute(item.key(), (key, existing) -> - item.isNewerThan(existing) ? item : existing)); - communicationExecutor.submit(() -> { - clusterCommunicator.unicast(ImmutableList.copyOf(map.values()), - updateMessageSubject, - serializer::encode, - peer) - .whenComplete((result, error) -> { - if (error != null) { - log.debug("Failed to send to {}", peer, error); - } - }); - }); - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java deleted file mode 100644 index f5ce47fc..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.ecmap; - -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.mapdb.Hasher; -import org.mapdb.Serializer; -import org.onosproject.store.serializers.KryoSerializer; - -import java.io.File; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * MapDB based implementation of a persistent store. - */ -class MapDbPersistentStore<K, V> implements PersistentStore<K, V> { - - private final ExecutorService executor; - private final KryoSerializer serializer; - - private final DB database; - - private final Map<byte[], byte[]> items; - - /** - * Creates a new MapDB based persistent store. - * - * @param filename filename of the database on disk - * @param executor executor to use for tasks that write to the disk - * @param serializer serializer for keys and values - */ - MapDbPersistentStore(String filename, ExecutorService executor, - KryoSerializer serializer) { - this.executor = checkNotNull(executor); - this.serializer = checkNotNull(serializer); - - File databaseFile = new File(filename); - - database = DBMaker.newFileDB(databaseFile).make(); - - items = database.createHashMap("items") - .keySerializer(Serializer.BYTE_ARRAY) - .valueSerializer(Serializer.BYTE_ARRAY) - .hasher(Hasher.BYTE_ARRAY) - .makeOrGet(); - } - - @Override - public void readInto(Map<K, MapValue<V>> items) { - this.items.forEach((keyBytes, valueBytes) -> - items.put(serializer.decode(keyBytes), - serializer.decode(valueBytes))); - } - - @Override - public void update(K key, MapValue<V> value) { - executor.submit(() -> updateInternal(key, value)); - } - - @Override - public void remove(K key) { - executor.submit(() -> removeInternal(key)); - } - - private void updateInternal(K key, MapValue<V> newValue) { - byte[] keyBytes = serializer.encode(key); - - items.compute(keyBytes, (k, existingBytes) -> { - MapValue<V> existing = existingBytes == null ? null : - serializer.decode(existingBytes); - if (existing == null || newValue.isNewerThan(existing)) { - return serializer.encode(newValue); - } else { - return existingBytes; - } - }); - database.commit(); - } - - private void removeInternal(K key) { - byte[] keyBytes = serializer.encode(key); - items.remove(keyBytes); - database.commit(); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java deleted file mode 100644 index bb69b472..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapValue.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.ecmap; - -import org.onosproject.store.Timestamp; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; - -/** - * Representation of a value in EventuallyConsistentMap. - * - * @param <V> value type - */ -public class MapValue<V> implements Comparable<MapValue<V>> { - private final Timestamp timestamp; - private final V value; - - /** - * Creates a tombstone value with the specified timestamp. - * @param timestamp timestamp for tombstone - * @return tombstone MapValue - * - * @param <U> value type - */ - public static <U> MapValue<U> tombstone(Timestamp timestamp) { - return new MapValue<>(null, timestamp); - } - - public MapValue(V value, Timestamp timestamp) { - this.value = value; - this.timestamp = timestamp; - } - - public boolean isTombstone() { - return value == null; - } - - public boolean isAlive() { - return value != null; - } - - public Timestamp timestamp() { - return timestamp; - } - - public V get() { - return value; - } - - @Override - public int compareTo(MapValue<V> o) { - return this.timestamp.compareTo(o.timestamp); - } - - public boolean isNewerThan(MapValue<V> other) { - return timestamp.isNewerThan(other.timestamp); - } - - public boolean isNewerThan(Timestamp timestamp) { - return this.timestamp.isNewerThan(timestamp); - } - - public Digest digest() { - return new Digest(timestamp, isTombstone()); - } - - @Override - public int hashCode() { - return Objects.hashCode(timestamp, value); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object other) { - if (other instanceof MapValue) { - MapValue<V> that = (MapValue<V>) other; - return Objects.equal(this.timestamp, that.timestamp) && - Objects.equal(this.value, that.value); - } - return false; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("timestamp", timestamp) - .add("value", value) - .toString(); - } - - @SuppressWarnings("unused") - private MapValue() { - this.timestamp = null; - this.value = null; - } - - /** - * Digest or summary of a MapValue for use during Anti-Entropy exchanges. - */ - public static class Digest { - private final Timestamp timestamp; - private final boolean isTombstone; - - public Digest(Timestamp timestamp, boolean isTombstone) { - this.timestamp = timestamp; - this.isTombstone = isTombstone; - } - - public Timestamp timestamp() { - return timestamp; - } - - public boolean isTombstone() { - return isTombstone; - } - - public boolean isNewerThan(Digest other) { - return timestamp.isNewerThan(other.timestamp); - } - - @Override - public int hashCode() { - return Objects.hashCode(timestamp, isTombstone); - } - - @Override - public boolean equals(Object other) { - if (other instanceof Digest) { - Digest that = (Digest) other; - return Objects.equal(this.timestamp, that.timestamp) && - Objects.equal(this.isTombstone, that.isTombstone); - } - return false; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("timestamp", timestamp) - .add("isTombstone", isTombstone) - .toString(); - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java deleted file mode 100644 index e85987a7..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.ecmap; - -import java.util.Map; - -/** - * A persistent store for an eventually consistent map. - */ -interface PersistentStore<K, V> { - - /** - * Read the contents of the disk into the given maps. - * - * @param items items map - */ - void readInto(Map<K, MapValue<V>> items); - - /** - * Updates a key,value pair in the persistent store. - * - * @param key the key - * @param value the value - */ - void update(K key, MapValue<V> value); - - /** - * Removes a key from persistent store. - * - * @param key the key to remove - */ - void remove(K key); -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java deleted file mode 100644 index 53683b98..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/UpdateEntry.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.ecmap; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.MoreObjects; - -/** - * Describes a single update event in an EventuallyConsistentMap. - */ -final class UpdateEntry<K, V> { - private final K key; - private final MapValue<V> value; - - /** - * Creates a new update entry. - * - * @param key key of the entry - * @param value value of the entry - */ - public UpdateEntry(K key, MapValue<V> value) { - this.key = checkNotNull(key); - this.value = value; - } - - /** - * Returns the key. - * - * @return the key - */ - public K key() { - return key; - } - - /** - * Returns the value of the entry. - * - * @return the value - */ - public MapValue<V> value() { - return value; - } - - /** - * Returns if this entry is newer than other entry. - * @param other other entry - * @return true if this entry is newer; false otherwise - */ - public boolean isNewerThan(UpdateEntry<K, V> other) { - return other == null || other.value == null || (value != null && value.isNewerThan(other.value)); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("key", key()) - .add("value", value) - .toString(); - } - - @SuppressWarnings("unused") - private UpdateEntry() { - this.key = null; - this.value = null; - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/package-info.java deleted file mode 100644 index 81fd2868..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Distributed map with eventually-consistent update semantics and gossip - * based anti-entropy mechanism. - */ -package org.onosproject.store.ecmap;
\ No newline at end of file |