aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java679
1 files changed, 0 insertions, 679 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
deleted file mode 100644
index b5ea52e0..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.AbstractAccumulator;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.Timer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-
-/**
- * Distributed Map implementation which uses optimistic replication and gossip
- * based techniques to provide an eventually consistent data store.
- */
-public class EventuallyConsistentMapImpl<K, V>
- implements EventuallyConsistentMap<K, V> {
-
- private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
-
- private final Map<K, MapValue<V>> items;
-
- private final ClusterService clusterService;
- private final ClusterCommunicationService clusterCommunicator;
- private final KryoSerializer serializer;
- private final NodeId localNodeId;
- private final PersistenceService persistenceService;
-
- private final BiFunction<K, V, Timestamp> timestampProvider;
-
- private final MessageSubject updateMessageSubject;
- private final MessageSubject antiEntropyAdvertisementSubject;
-
- private final Set<EventuallyConsistentMapListener<K, V>> listeners
- = Sets.newCopyOnWriteArraySet();
-
- private final ExecutorService executor;
- private final ScheduledExecutorService backgroundExecutor;
- private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
-
- private final ExecutorService communicationExecutor;
- private final Map<NodeId, EventAccumulator> senderPending;
-
- private final String mapName;
-
- private volatile boolean destroyed = false;
- private static final String ERROR_DESTROYED = " map is already destroyed";
- private final String destroyedMessage;
-
- private static final String ERROR_NULL_KEY = "Key cannot be null";
- private static final String ERROR_NULL_VALUE = "Null values are not allowed";
-
- private final long initialDelaySec = 5;
- private final boolean lightweightAntiEntropy;
- private final boolean tombstonesDisabled;
-
- private static final int WINDOW_SIZE = 5;
- private static final int HIGH_LOAD_THRESHOLD = 0;
- private static final int LOAD_WINDOW = 2;
- private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
-
- private final boolean persistent;
-
- private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
-
-
- /**
- * Creates a new eventually consistent map shared amongst multiple instances.
- * <p>
- * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
- * for more description of the parameters expected by the map.
- * </p>
- *
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param timestampProvider provider of timestamps for K and V
- * @param peerUpdateFunction function that provides a set of nodes to immediately
- * update to when there writes to the map
- * @param eventExecutor executor to use for processing incoming
- * events from peers
- * @param communicationExecutor executor to use for sending events to peers
- * @param backgroundExecutor executor to use for background anti-entropy
- * tasks
- * @param tombstonesDisabled true if this map should not maintain
- * tombstones
- * @param antiEntropyPeriod period that the anti-entropy task should run
- * @param antiEntropyTimeUnit time unit for anti-entropy period
- * @param convergeFaster make anti-entropy try to converge faster
- * @param persistent persist data to disk
- */
- EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- BiFunction<K, V, Timestamp> timestampProvider,
- BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
- ExecutorService eventExecutor,
- ExecutorService communicationExecutor,
- ScheduledExecutorService backgroundExecutor,
- boolean tombstonesDisabled,
- long antiEntropyPeriod,
- TimeUnit antiEntropyTimeUnit,
- boolean convergeFaster,
- boolean persistent,
- PersistenceService persistenceService) {
- this.mapName = mapName;
- this.serializer = createSerializer(serializerBuilder);
- this.persistenceService = persistenceService;
- this.persistent =
- persistent;
- if (persistent) {
- items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
- .withName(PERSISTENT_LOCAL_MAP_NAME)
- .withSerializer(new Serializer() {
-
- @Override
- public <T> byte[] encode(T object) {
- return EventuallyConsistentMapImpl.this.serializer.encode(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
- }
- })
- .build();
- } else {
- items = Maps.newConcurrentMap();
- }
- senderPending = Maps.newConcurrentMap();
- destroyedMessage = mapName + ERROR_DESTROYED;
-
- this.clusterService = clusterService;
- this.clusterCommunicator = clusterCommunicator;
- this.localNodeId = clusterService.getLocalNode().id();
-
- this.timestampProvider = timestampProvider;
-
- if (peerUpdateFunction != null) {
- this.peerUpdateFunction = peerUpdateFunction;
- } else {
- this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
- .map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(localNodeId))
- .collect(Collectors.toList());
- }
-
- if (eventExecutor != null) {
- this.executor = eventExecutor;
- } else {
- // should be a normal executor; it's used for receiving messages
- this.executor =
- Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
- }
-
- if (communicationExecutor != null) {
- this.communicationExecutor = communicationExecutor;
- } else {
- // sending executor; should be capped
- //TODO this probably doesn't need to be bounded anymore
- this.communicationExecutor =
- newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
- }
-
-
- if (backgroundExecutor != null) {
- this.backgroundExecutor = backgroundExecutor;
- } else {
- this.backgroundExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
- }
-
- // start anti-entropy thread
- this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
- initialDelaySec, antiEntropyPeriod,
- antiEntropyTimeUnit);
-
- updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
- clusterCommunicator.addSubscriber(updateMessageSubject,
- serializer::decode,
- this::processUpdates,
- this.executor);
-
- antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
- clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- serializer::decode,
- this::handleAntiEntropyAdvertisement,
- this.backgroundExecutor);
-
- this.tombstonesDisabled = tombstonesDisabled;
- this.lightweightAntiEntropy = !convergeFaster;
- }
-
- private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
- return new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- // Add the map's internal helper classes to the user-supplied serializer
- serializerPool = builder
- .register(KryoNamespaces.BASIC)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(LogicalTimestamp.class)
- .register(WallClockTimestamp.class)
- .register(AntiEntropyAdvertisement.class)
- .register(UpdateEntry.class)
- .register(MapValue.class)
- .register(MapValue.Digest.class)
- .build();
- }
- };
- }
-
- @Override
- public int size() {
- checkState(!destroyed, destroyedMessage);
- // TODO: Maintain a separate counter for tracking live elements in map.
- return Maps.filterValues(items, MapValue::isAlive).size();
- }
-
- @Override
- public boolean isEmpty() {
- checkState(!destroyed, destroyedMessage);
- return size() == 0;
- }
-
- @Override
- public boolean containsKey(K key) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- return get(key) != null;
- }
-
- @Override
- public boolean containsValue(V value) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(value, ERROR_NULL_VALUE);
- return items.values()
- .stream()
- .filter(MapValue::isAlive)
- .anyMatch(v -> value.equals(v.get()));
- }
-
- @Override
- public V get(K key) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
-
- MapValue<V> value = items.get(key);
- return (value == null || value.isTombstone()) ? null : value.get();
- }
-
- @Override
- public void put(K key, V value) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
-
- MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
- if (putInternal(key, newValue)) {
- notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
- notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
- }
- }
-
- @Override
- public V remove(K key) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- return removeAndNotify(key, null);
- }
-
- @Override
- public void remove(K key, V value) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- removeAndNotify(key, value);
- }
-
- private V removeAndNotify(K key, V value) {
- Timestamp timestamp = timestampProvider.apply(key, value);
- Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
- ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
- MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
- if (previousValue != null) {
- notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
- peerUpdateFunction.apply(key, previousValue.get()));
- if (previousValue.isAlive()) {
- notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
- }
- }
- return previousValue != null ? previousValue.get() : null;
- }
-
- private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- tombstone.ifPresent(v -> checkState(v.isTombstone()));
-
- counter.incrementCount();
- AtomicBoolean updated = new AtomicBoolean(false);
- AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
- items.compute(key, (k, existing) -> {
- boolean valueMatches = true;
- if (value.isPresent() && existing != null && existing.isAlive()) {
- valueMatches = Objects.equals(value.get(), existing.get());
- }
- if (existing == null) {
- log.trace("ECMap Remove: Existing value for key {} is already null", k);
- }
- if (valueMatches) {
- if (existing == null) {
- updated.set(tombstone.isPresent());
- } else {
- updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
- }
- }
- if (updated.get()) {
- previousValue.set(existing);
- return tombstone.orElse(null);
- } else {
- return existing;
- }
- });
- return previousValue.get();
- }
-
- @Override
- public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(recomputeFunction, "Recompute function cannot be null");
-
- AtomicBoolean updated = new AtomicBoolean(false);
- AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
- MapValue<V> computedValue = items.compute(key, (k, mv) -> {
- previousValue.set(mv);
- V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
- MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
- if (mv == null || newValue.isNewerThan(mv)) {
- updated.set(true);
- return newValue;
- } else {
- return mv;
- }
- });
- if (updated.get()) {
- notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
- EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
- V value = computedValue.isTombstone()
- ? previousValue.get() == null ? null : previousValue.get().get()
- : computedValue.get();
- if (value != null) {
- notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
- }
- }
- return computedValue.get();
- }
-
- @Override
- public void putAll(Map<? extends K, ? extends V> m) {
- checkState(!destroyed, destroyedMessage);
- m.forEach(this::put);
- }
-
- @Override
- public void clear() {
- checkState(!destroyed, destroyedMessage);
- Maps.filterValues(items, MapValue::isAlive)
- .forEach((k, v) -> remove(k));
- }
-
- @Override
- public Set<K> keySet() {
- checkState(!destroyed, destroyedMessage);
- return Maps.filterValues(items, MapValue::isAlive)
- .keySet();
- }
-
- @Override
- public Collection<V> values() {
- checkState(!destroyed, destroyedMessage);
- return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
- }
-
- @Override
- public Set<Map.Entry<K, V>> entrySet() {
- checkState(!destroyed, destroyedMessage);
- return Maps.filterValues(items, MapValue::isAlive)
- .entrySet()
- .stream()
- .map(e -> Pair.of(e.getKey(), e.getValue().get()))
- .collect(Collectors.toSet());
- }
-
- /**
- * Returns true if newValue was accepted i.e. map is updated.
- *
- * @param key key
- * @param newValue proposed new value
- * @return true if update happened; false if map already contains a more recent value for the key
- */
- private boolean putInternal(K key, MapValue<V> newValue) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(newValue, ERROR_NULL_VALUE);
- checkState(newValue.isAlive());
- counter.incrementCount();
- AtomicBoolean updated = new AtomicBoolean(false);
- items.compute(key, (k, existing) -> {
- if (existing == null || newValue.isNewerThan(existing)) {
- updated.set(true);
- return newValue;
- }
- return existing;
- });
- return updated.get();
- }
-
- @Override
- public void addListener(EventuallyConsistentMapListener<K, V> listener) {
- checkState(!destroyed, destroyedMessage);
-
- listeners.add(checkNotNull(listener));
- }
-
- @Override
- public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
- checkState(!destroyed, destroyedMessage);
-
- listeners.remove(checkNotNull(listener));
- }
-
- @Override
- public void destroy() {
- destroyed = true;
-
- executor.shutdown();
- backgroundExecutor.shutdown();
- communicationExecutor.shutdown();
-
- listeners.clear();
-
- clusterCommunicator.removeSubscriber(updateMessageSubject);
- clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
- }
-
- private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
- listeners.forEach(listener -> listener.event(event));
- }
-
- private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
- queueUpdate(event, peers);
- }
-
- private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
- if (peers == null) {
- // we have no friends :(
- return;
- }
- peers.forEach(node ->
- senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
- );
- }
-
- private boolean underHighLoad() {
- return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
- }
-
- private void sendAdvertisement() {
- try {
- if (underHighLoad() || destroyed) {
- return;
- }
- pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
- } catch (Exception e) {
- // Catch all exceptions to avoid scheduled task being suppressed.
- log.error("Exception thrown while sending advertisement", e);
- }
- }
-
- private Optional<NodeId> pickRandomActivePeer() {
- List<NodeId> activePeers = clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .filter(id -> !localNodeId.equals(id))
- .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
- .collect(Collectors.toList());
- Collections.shuffle(activePeers);
- return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
- }
-
- private void sendAdvertisementToPeer(NodeId peer) {
- clusterCommunicator.unicast(createAdvertisement(),
- antiEntropyAdvertisementSubject,
- serializer::encode,
- peer)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
- }
- });
- }
-
- private AntiEntropyAdvertisement<K> createAdvertisement() {
- return new AntiEntropyAdvertisement<K>(localNodeId,
- ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
- }
-
- private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
- if (destroyed || underHighLoad()) {
- return;
- }
- try {
- if (log.isTraceEnabled()) {
- log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
- mapName, ad.sender(), ad.digest().size());
- }
- antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
-
- if (!lightweightAntiEntropy) {
- // if remote ad has any entries that the local copy is missing, actively sync
- // TODO: Missing keys is not the way local copy can be behind.
- if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
- // TODO: Send ad for missing keys and for entries that are stale
- sendAdvertisementToPeer(ad.sender());
- }
- }
- } catch (Exception e) {
- log.warn("Error handling anti-entropy advertisement", e);
- }
- }
-
- /**
- * Processes anti-entropy ad from peer by taking following actions:
- * 1. If peer has an old entry, updates peer.
- * 2. If peer indicates an entry is removed and has a more recent
- * timestamp than the local entry, update local state.
- */
- private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
- AntiEntropyAdvertisement<K> ad) {
- final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
- final NodeId sender = ad.sender();
- items.forEach((key, localValue) -> {
- MapValue.Digest remoteValueDigest = ad.digest().get(key);
- if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
- // local value is more recent, push to sender
- queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
- }
- if (remoteValueDigest != null
- && remoteValueDigest.isNewerThan(localValue.digest())
- && remoteValueDigest.isTombstone()) {
- MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
- MapValue<V> previousValue = removeInternal(key,
- Optional.empty(),
- Optional.of(tombstone));
- if (previousValue != null && previousValue.isAlive()) {
- externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
- }
- }
- });
- return externalEvents;
- }
-
- private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
- if (destroyed) {
- return;
- }
- updates.forEach(update -> {
- final K key = update.key();
- final MapValue<V> value = update.value();
- if (value == null || value.isTombstone()) {
- MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
- if (previousValue != null && previousValue.isAlive()) {
- notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
- }
- } else if (putInternal(key, value)) {
- notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
- }
- });
- }
-
- // TODO pull this into the class if this gets pulled out...
- private static final int DEFAULT_MAX_EVENTS = 1000;
- private static final int DEFAULT_MAX_IDLE_MS = 10;
- private static final int DEFAULT_MAX_BATCH_MS = 50;
- private static final Timer TIMER = new Timer("onos-ecm-sender-events");
-
- private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
-
- private final NodeId peer;
-
- private EventAccumulator(NodeId peer) {
- super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
- this.peer = peer;
- }
-
- @Override
- public void processItems(List<UpdateEntry<K, V>> items) {
- Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
- items.forEach(item -> map.compute(item.key(), (key, existing) ->
- item.isNewerThan(existing) ? item : existing));
- communicationExecutor.submit(() -> {
- clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
- updateMessageSubject,
- serializer::encode,
- peer)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to send to {}", peer, error);
- }
- });
- });
- }
- }
-}