aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java')
-rw-r--r--framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java914
1 files changed, 0 insertions, 914 deletions
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
deleted file mode 100644
index b74aa370..00000000
--- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.AbstractEvent;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.persistence.TestPersistenceService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.WallClockTimestamp;
-
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static junit.framework.TestCase.assertFalse;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Unit tests for EventuallyConsistentMapImpl.
- */
-public class EventuallyConsistentMapImplTest {
-
- private EventuallyConsistentMap<String, String> ecMap;
-
- private PersistenceService persistenceService;
- private ClusterService clusterService;
- private ClusterCommunicationService clusterCommunicator;
- private SequentialClockService<String, String> clockService;
-
- private static final String MAP_NAME = "test";
- private static final MessageSubject UPDATE_MESSAGE_SUBJECT
- = new MessageSubject("ecm-" + MAP_NAME + "-update");
- private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
- = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
-
- private static final String KEY1 = "one";
- private static final String KEY2 = "two";
- private static final String VALUE1 = "oneValue";
- private static final String VALUE2 = "twoValue";
-
- private final ControllerNode self =
- new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
-
- private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
- private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
-
- /*
- * Serialization is a bit tricky here. We need to serialize in the tests
- * to set the expectations, which will use this serializer here, but the
- * EventuallyConsistentMap will use its own internal serializer. This means
- * this serializer must be set up exactly the same as map's internal
- * serializer.
- */
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- // Classes we give to the map
- .register(KryoNamespaces.API)
- .register(TestTimestamp.class)
- // Below is the classes that the map internally registers
- .register(LogicalTimestamp.class)
- .register(WallClockTimestamp.class)
- .register(ArrayList.class)
- .register(AntiEntropyAdvertisement.class)
- .register(HashMap.class)
- .register(Optional.class)
- .build();
- }
- };
-
- @Before
- public void setUp() throws Exception {
- clusterService = createMock(ClusterService.class);
- expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
- expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
- replay(clusterService);
-
- clusterCommunicator = createMock(ClusterCommunicationService.class);
-
- persistenceService = new TestPersistenceService();
- // Add expectation for adding cluster message subscribers which
- // delegate to our ClusterCommunicationService implementation. This
- // allows us to get a reference to the map's internal cluster message
- // handlers so we can induce events coming in from a peer.
- clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
- anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
- expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
-
- replay(clusterCommunicator);
-
- clockService = new SequentialClockService<>();
-
- KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(TestTimestamp.class);
-
- ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
- clusterService, clusterCommunicator, persistenceService)
- .withName(MAP_NAME)
- .withSerializer(serializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
- .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
- .withPersistence()
- .build();
-
- // Reset ready for tests to add their own expectations
- reset(clusterCommunicator);
- }
-
- @After
- public void tearDown() {
- reset(clusterCommunicator);
- ecMap.destroy();
- }
-
- @SuppressWarnings("unchecked")
- private EventuallyConsistentMapListener<String, String> getListener() {
- return createMock(EventuallyConsistentMapListener.class);
- }
-
- @Test
- public void testSize() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertEquals(0, ecMap.size());
- ecMap.put(KEY1, VALUE1);
- assertEquals(1, ecMap.size());
- ecMap.put(KEY1, VALUE2);
- assertEquals(1, ecMap.size());
- ecMap.put(KEY2, VALUE2);
- assertEquals(2, ecMap.size());
- for (int i = 0; i < 10; i++) {
- ecMap.put("" + i, "" + i);
- }
- assertEquals(12, ecMap.size());
- ecMap.remove(KEY1);
- assertEquals(11, ecMap.size());
- ecMap.remove(KEY1);
- assertEquals(11, ecMap.size());
- }
-
- @Test
- public void testIsEmpty() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertTrue(ecMap.isEmpty());
- ecMap.put(KEY1, VALUE1);
- assertFalse(ecMap.isEmpty());
- ecMap.remove(KEY1);
- assertTrue(ecMap.isEmpty());
- }
-
- @Test
- public void testContainsKey() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertFalse(ecMap.containsKey(KEY1));
- ecMap.put(KEY1, VALUE1);
- assertTrue(ecMap.containsKey(KEY1));
- assertFalse(ecMap.containsKey(KEY2));
- ecMap.remove(KEY1);
- assertFalse(ecMap.containsKey(KEY1));
- }
-
- @Test
- public void testContainsValue() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertFalse(ecMap.containsValue(VALUE1));
- ecMap.put(KEY1, VALUE1);
- assertTrue(ecMap.containsValue(VALUE1));
- assertFalse(ecMap.containsValue(VALUE2));
- ecMap.put(KEY1, VALUE2);
- assertFalse(ecMap.containsValue(VALUE1));
- assertTrue(ecMap.containsValue(VALUE2));
- ecMap.remove(KEY1);
- assertFalse(ecMap.containsValue(VALUE2));
- }
-
- @Test
- public void testGet() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- CountDownLatch latch;
-
- // Local put
- assertNull(ecMap.get(KEY1));
- ecMap.put(KEY1, VALUE1);
- assertEquals(VALUE1, ecMap.get(KEY1));
-
- // Remote put
- List<UpdateEntry<String, String>> message
- = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
-
- // Create a latch so we know when the put operation has finished
- latch = new CountDownLatch(1);
- ecMap.addListener(new TestListener(latch));
-
- assertNull(ecMap.get(KEY2));
- updateHandler.accept(message);
- assertTrue("External listener never got notified of internal event",
- latch.await(100, TimeUnit.MILLISECONDS));
- assertEquals(VALUE2, ecMap.get(KEY2));
-
- // Local remove
- ecMap.remove(KEY2);
- assertNull(ecMap.get(KEY2));
-
- // Remote remove
- message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
-
- // Create a latch so we know when the remove operation has finished
- latch = new CountDownLatch(1);
- ecMap.addListener(new TestListener(latch));
-
- updateHandler.accept(message);
- assertTrue("External listener never got notified of internal event",
- latch.await(100, TimeUnit.MILLISECONDS));
- assertNull(ecMap.get(KEY1));
- }
-
- @Test
- public void testPut() throws Exception {
- // Set up expectations of external events to be sent to listeners during
- // the test. These don't use timestamps so we can set them all up at once.
- EventuallyConsistentMapListener<String, String> listener
- = getListener();
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2));
- replay(listener);
-
- ecMap.addListener(listener);
-
- // Set up expected internal message to be broadcast to peers on first put
- expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
- .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- // Put first value
- assertNull(ecMap.get(KEY1));
- ecMap.put(KEY1, VALUE1);
- assertEquals(VALUE1, ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
- // Set up expected internal message to be broadcast to peers on second put
- expectSpecificMulticastMessage(generatePutMessage(
- KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- // Update same key to a new value
- ecMap.put(KEY1, VALUE2);
- assertEquals(VALUE2, ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
- // Do a put with a older timestamp than the value already there.
- // The map data should not be changed and no notifications should be sent.
- reset(clusterCommunicator);
- replay(clusterCommunicator);
-
- clockService.turnBackTime();
- ecMap.put(KEY1, VALUE1);
- // Value should not have changed.
- assertEquals(VALUE2, ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
- // Check that our listener received the correct events during the test
- verify(listener);
- }
-
- @Test
- public void testRemove() throws Exception {
- // Set up expectations of external events to be sent to listeners during
- // the test. These don't use timestamps so we can set them all up at once.
- EventuallyConsistentMapListener<String, String> listener
- = getListener();
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
- replay(listener);
-
- ecMap.addListener(listener);
-
- // Put in an initial value
- expectPeerMessage(clusterCommunicator);
- ecMap.put(KEY1, VALUE1);
- assertEquals(VALUE1, ecMap.get(KEY1));
-
- // Remove the value and check the correct internal cluster messages
- // are sent
- expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- ecMap.remove(KEY1);
- assertNull(ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
- // Remove the same value again. Even though the value is no longer in
- // the map, we expect that the tombstone is updated and another remove
- // event is sent to the cluster and external listeners.
- expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- ecMap.remove(KEY1);
- assertNull(ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
-
- // Put in a new value for us to try and remove
- expectPeerMessage(clusterCommunicator);
-
- ecMap.put(KEY2, VALUE2);
-
- clockService.turnBackTime();
-
- // Remove should have no effect, since it has an older timestamp than
- // the put. Expect no notifications to be sent out
- reset(clusterCommunicator);
- replay(clusterCommunicator);
-
- ecMap.remove(KEY2);
-
- verify(clusterCommunicator);
-
- // Check that our listener received the correct events during the test
- verify(listener);
- }
-
- @Test
- public void testCompute() throws Exception {
- // Set up expectations of external events to be sent to listeners during
- // the test. These don't use timestamps so we can set them all up at once.
- EventuallyConsistentMapListener<String, String> listener
- = getListener();
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
- replay(listener);
-
- ecMap.addListener(listener);
-
- // Put in an initial value
- expectPeerMessage(clusterCommunicator);
- ecMap.compute(KEY1, (k, v) -> VALUE1);
- assertEquals(VALUE1, ecMap.get(KEY1));
-
- // Remove the value and check the correct internal cluster messages
- // are sent
- expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- ecMap.compute(KEY1, (k, v) -> null);
- assertNull(ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
- // Remove the same value again. Even though the value is no longer in
- // the map, we expect that the tombstone is updated and another remove
- // event is sent to the cluster and external listeners.
- expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- ecMap.compute(KEY1, (k, v) -> null);
- assertNull(ecMap.get(KEY1));
-
- verify(clusterCommunicator);
-
- // Put in a new value for us to try and remove
- expectPeerMessage(clusterCommunicator);
-
- ecMap.compute(KEY2, (k, v) -> VALUE2);
-
- clockService.turnBackTime();
-
- // Remove should have no effect, since it has an older timestamp than
- // the put. Expect no notifications to be sent out
- reset(clusterCommunicator);
- replay(clusterCommunicator);
-
- ecMap.compute(KEY2, (k, v) -> null);
-
- verify(clusterCommunicator);
-
- // Check that our listener received the correct events during the test
- verify(listener);
- }
-
- @Test
- public void testPutAll() throws Exception {
- // putAll() with an empty map is a no-op - no messages will be sent
- reset(clusterCommunicator);
- replay(clusterCommunicator);
-
- ecMap.putAll(new HashMap<>());
-
- verify(clusterCommunicator);
-
- // Set up the listener with our expected events
- EventuallyConsistentMapListener<String, String> listener
- = getListener();
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
- replay(listener);
-
- ecMap.addListener(listener);
-
- // Expect a multi-update inter-instance message
- expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
- clusterCommunicator);
-
- Map<String, String> putAllValues = new HashMap<>();
- putAllValues.put(KEY1, VALUE1);
- putAllValues.put(KEY2, VALUE2);
-
- // Put the values in the map
- ecMap.putAll(putAllValues);
-
- // Check the correct messages and events were sent
- verify(clusterCommunicator);
- verify(listener);
- }
-
- @Test
- public void testClear() throws Exception {
- EventuallyConsistentMapListener<String, String> listener
- = getListener();
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
- listener.event(new EventuallyConsistentMapEvent<>(
- MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
- replay(listener);
-
- // clear() on an empty map is a no-op - no messages will be sent
- reset(clusterCommunicator);
- replay(clusterCommunicator);
-
- assertTrue(ecMap.isEmpty());
- ecMap.clear();
- verify(clusterCommunicator);
-
- // Put some items in the map
- expectPeerMessage(clusterCommunicator);
- ecMap.put(KEY1, VALUE1);
- ecMap.put(KEY2, VALUE2);
-
- ecMap.addListener(listener);
- expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
-
- ecMap.clear();
-
- verify(clusterCommunicator);
- verify(listener);
- }
-
- @Test
- public void testKeySet() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertTrue(ecMap.keySet().isEmpty());
-
- // Generate some keys
- Set<String> keys = new HashSet<>();
- for (int i = 1; i <= 10; i++) {
- keys.add("" + i);
- }
-
- // Put each key in the map
- keys.forEach(k -> ecMap.put(k, "value" + k));
-
- // Check keySet() returns the correct value
- assertEquals(keys, ecMap.keySet());
-
- // Update the value for one of the keys
- ecMap.put(keys.iterator().next(), "new-value");
-
- // Check the key set is still the same
- assertEquals(keys, ecMap.keySet());
-
- // Remove a key
- String removeKey = keys.iterator().next();
- keys.remove(removeKey);
- ecMap.remove(removeKey);
-
- // Check the key set is still correct
- assertEquals(keys, ecMap.keySet());
- }
-
- @Test
- public void testValues() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertTrue(ecMap.values().isEmpty());
-
- // Generate some values
- Map<String, String> expectedValues = new HashMap<>();
- for (int i = 1; i <= 10; i++) {
- expectedValues.put("" + i, "value" + i);
- }
-
- // Add them into the map
- expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
-
- // Check the values collection is correct
- assertEquals(expectedValues.values().size(), ecMap.values().size());
- expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
-
- // Update the value for one of the keys
- Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
- expectedValues.put(first.getKey(), "new-value");
- ecMap.put(first.getKey(), "new-value");
-
- // Check the values collection is still correct
- assertEquals(expectedValues.values().size(), ecMap.values().size());
- expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
-
- // Remove a key
- String removeKey = expectedValues.keySet().iterator().next();
- expectedValues.remove(removeKey);
- ecMap.remove(removeKey);
-
- // Check the values collection is still correct
- assertEquals(expectedValues.values().size(), ecMap.values().size());
- expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v)));
- }
-
- @Test
- public void testEntrySet() throws Exception {
- expectPeerMessage(clusterCommunicator);
-
- assertTrue(ecMap.entrySet().isEmpty());
-
- // Generate some values
- Map<String, String> expectedValues = new HashMap<>();
- for (int i = 1; i <= 10; i++) {
- expectedValues.put("" + i, "value" + i);
- }
-
- // Add them into the map
- expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue()));
-
- // Check the entry set is correct
- assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
-
- // Update the value for one of the keys
- Map.Entry<String, String> first = expectedValues.entrySet().iterator().next();
- expectedValues.put(first.getKey(), "new-value");
- ecMap.put(first.getKey(), "new-value");
-
- // Check the entry set is still correct
- assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
-
- // Remove a key
- String removeKey = expectedValues.keySet().iterator().next();
- expectedValues.remove(removeKey);
- ecMap.remove(removeKey);
-
- // Check the entry set is still correct
- assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet()));
- }
-
- private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) {
- if (expectedMap.entrySet().size() != actual.size()) {
- return false;
- }
-
- for (Map.Entry<String, String> e : actual) {
- if (!expectedMap.containsKey(e.getKey())) {
- return false;
- }
- if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) {
- return false;
- }
- }
- return true;
- }
-
- @Test
- public void testDestroy() throws Exception {
- clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
- clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
-
- replay(clusterCommunicator);
-
- ecMap.destroy();
-
- verify(clusterCommunicator);
-
- try {
- ecMap.get(KEY1);
- fail("get after destroy should throw exception");
- } catch (IllegalStateException e) {
- assertTrue(true);
- }
-
- try {
- ecMap.put(KEY1, VALUE1);
- fail("put after destroy should throw exception");
- } catch (IllegalStateException e) {
- assertTrue(true);
- }
- }
-
- private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
- return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
- }
-
- private List<UpdateEntry<String, String>> generatePutMessage(
- String key1, String value1, String key2, String value2) {
- List<UpdateEntry<String, String>> list = new ArrayList<>();
-
- Timestamp timestamp1 = clockService.peek(1);
- Timestamp timestamp2 = clockService.peek(2);
-
- list.add(generatePutMessage(key1, value1, timestamp1));
- list.add(generatePutMessage(key2, value2, timestamp2));
-
- return list;
- }
-
- private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
- return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
- }
-
- private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
- List<UpdateEntry<String, String>> list = new ArrayList<>();
-
- Timestamp timestamp1 = clockService.peek(1);
- Timestamp timestamp2 = clockService.peek(2);
-
- list.add(generateRemoveMessage(key1, timestamp1));
- list.add(generateRemoveMessage(key2, timestamp2));
-
- return list;
- }
-
- /**
- * Sets up a mock ClusterCommunicationService to expect a specific cluster
- * message to be broadcast to the cluster.
- *
- * @param message message we expect to be sent
- * @param clusterCommunicator a mock ClusterCommunicationService to set up
- */
- //FIXME rename
- private static <T> void expectSpecificBroadcastMessage(
- T message,
- MessageSubject subject,
- ClusterCommunicationService clusterCommunicator) {
- reset(clusterCommunicator);
- clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
- expectLastCall().anyTimes();
- replay(clusterCommunicator);
- }
-
- /**
- * Sets up a mock ClusterCommunicationService to expect a specific cluster
- * message to be multicast to the cluster.
- *
- * @param message message we expect to be sent
- * @param subject subject we expect to be sent to
- * @param clusterCommunicator a mock ClusterCommunicationService to set up
- */
- //FIXME rename
- private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
- ClusterCommunicationService clusterCommunicator) {
- reset(clusterCommunicator);
- clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
- expectLastCall().anyTimes();
- replay(clusterCommunicator);
- }
-
-
- /**
- * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
- * that is sent to it. This is useful for unit tests where we aren't
- * interested in testing the messaging component.
- *
- * @param clusterCommunicator a mock ClusterCommunicationService to set up
- */
- //FIXME rename
- private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
- reset(clusterCommunicator);
-// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
-// anyObject(Iterable.class)))
- expect(clusterCommunicator.<T>unicast(
- anyObject(),
- anyObject(MessageSubject.class),
- anyObject(Function.class),
- anyObject(NodeId.class)))
- .andReturn(CompletableFuture.completedFuture(null))
- .anyTimes();
- replay(clusterCommunicator);
- }
-
- /**
- * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
- * that is sent to it. This is useful for unit tests where we aren't
- * interested in testing the messaging component.
- *
- * @param clusterCommunicator a mock ClusterCommunicationService to set up
- */
- private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
- reset(clusterCommunicator);
- clusterCommunicator.<AbstractEvent>multicast(
- anyObject(AbstractEvent.class),
- anyObject(MessageSubject.class),
- anyObject(Function.class),
- anyObject(Set.class));
- expectLastCall().anyTimes();
- replay(clusterCommunicator);
- }
-
- /**
- * ClusterCommunicationService implementation that the map's addSubscriber
- * call will delegate to. This means we can get a reference to the
- * internal cluster message handler used by the map, so that we can simulate
- * events coming in from other instances.
- */
- private final class TestClusterCommunicationService
- extends ClusterCommunicationServiceAdapter {
-
- @Override
- public <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder, Consumer<M> handler,
- Executor executor) {
- if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
- updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
- } else {
- throw new RuntimeException("Unexpected message subject " + subject.toString());
- }
- }
- }
-
- /**
- * ClockService implementation that gives out timestamps based on a
- * sequential counter. This clock service enables more control over the
- * timestamps that are given out, including being able to "turn back time"
- * to give out timestamps from the past.
- *
- * @param <T> Type that the clock service will give out timestamps for
- * @param <U> Second type that the clock service will give out values for
- */
- private class SequentialClockService<T, U> {
-
- private static final long INITIAL_VALUE = 1;
- private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
-
- public Timestamp getTimestamp(T object, U object2) {
- return new TestTimestamp(counter.getAndIncrement());
- }
-
- /**
- * Returns what the next timestamp will be without consuming the
- * timestamp. This allows test code to set expectations correctly while
- * still allowing the CUT to get the same timestamp.
- *
- * @return timestamp equal to the timestamp that will be returned by the
- * next call to {@link #getTimestamp(T, U)}.
- */
- public Timestamp peekAtNextTimestamp() {
- return peek(1);
- }
-
- /**
- * Returns the ith timestamp to be given out in the future without
- * consuming the timestamp. For example, i=1 returns the next timestamp,
- * i=2 returns the timestamp after that, and so on.
- *
- * @param i number of the timestamp to peek at
- * @return the ith timestamp that will be given out
- */
- public Timestamp peek(int i) {
- checkArgument(i > 0, "i must be a positive integer");
-
- return new TestTimestamp(counter.get() + i - 1);
- }
-
- /**
- * Turns the clock back two ticks, so the next call to getTimestamp will
- * return an older timestamp than the previous call to getTimestamp.
- */
- public void turnBackTime() {
- // Not atomic, but should be OK for these tests.
- counter.decrementAndGet();
- counter.decrementAndGet();
- }
-
- }
-
- /**
- * Timestamp implementation where the value of the timestamp can be
- * specified explicitly at creation time.
- */
- private class TestTimestamp implements Timestamp {
-
- private final long timestamp;
-
- /**
- * Creates a new timestamp that has the specified value.
- *
- * @param timestamp value of the timestamp
- */
- public TestTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @Override
- public int compareTo(Timestamp o) {
- checkArgument(o instanceof TestTimestamp);
- TestTimestamp otherTimestamp = (TestTimestamp) o;
- return ComparisonChain.start()
- .compare(this.timestamp, otherTimestamp.timestamp)
- .result();
- }
- }
-
- /**
- * EventuallyConsistentMapListener implementation which triggers a latch
- * when it receives an event.
- */
- private class TestListener implements EventuallyConsistentMapListener<String, String> {
- private CountDownLatch latch;
-
- /**
- * Creates a new listener that will trigger the specified latch when it
- * receives and event.
- *
- * @param latch the latch to trigger on events
- */
- public TestListener(CountDownLatch latch) {
- this.latch = latch;
- }
-
- @Override
- public void event(EventuallyConsistentMapEvent<String, String> event) {
- latch.countDown();
- }
- }
-}