diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java | 909 |
1 files changed, 909 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java new file mode 100644 index 00000000..ccf6ee71 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java @@ -0,0 +1,909 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.ecmap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.packet.IpAddress; +import org.onlab.util.KryoNamespace; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.cluster.DefaultControllerNode; +import org.onosproject.cluster.NodeId; +import org.onosproject.event.AbstractEvent; +import org.onosproject.store.Timestamp; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.impl.LogicalTimestamp; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.serializers.KryoSerializer; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.WallClockTimestamp; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; + +import static com.google.common.base.Preconditions.checkArgument; +import static junit.framework.TestCase.assertFalse; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Unit tests for EventuallyConsistentMapImpl. + */ +public class EventuallyConsistentMapImplTest { + + private EventuallyConsistentMap<String, String> ecMap; + + private ClusterService clusterService; + private ClusterCommunicationService clusterCommunicator; + private SequentialClockService<String, String> clockService; + + private static final String MAP_NAME = "test"; + private static final MessageSubject UPDATE_MESSAGE_SUBJECT + = new MessageSubject("ecm-" + MAP_NAME + "-update"); + private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT + = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy"); + + private static final String KEY1 = "one"; + private static final String KEY2 = "two"; + private static final String VALUE1 = "oneValue"; + private static final String VALUE2 = "twoValue"; + + private final ControllerNode self = + new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1)); + + private Consumer<Collection<UpdateEntry<String, String>>> updateHandler; + private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler; + + /* + * Serialization is a bit tricky here. We need to serialize in the tests + * to set the expectations, which will use this serializer here, but the + * EventuallyConsistentMap will use its own internal serializer. This means + * this serializer must be set up exactly the same as map's internal + * serializer. + */ + private static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespace.newBuilder() + // Classes we give to the map + .register(KryoNamespaces.API) + .register(TestTimestamp.class) + // Below is the classes that the map internally registers + .register(LogicalTimestamp.class) + .register(WallClockTimestamp.class) + .register(ArrayList.class) + .register(AntiEntropyAdvertisement.class) + .register(HashMap.class) + .register(Optional.class) + .build(); + } + }; + + @Before + public void setUp() throws Exception { + clusterService = createMock(ClusterService.class); + expect(clusterService.getLocalNode()).andReturn(self).anyTimes(); + expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes(); + replay(clusterService); + + clusterCommunicator = createMock(ClusterCommunicationService.class); + + // Add expectation for adding cluster message subscribers which + // delegate to our ClusterCommunicationService implementation. This + // allows us to get a reference to the map's internal cluster message + // handlers so we can induce events coming in from a peer. + clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class), + anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class)); + expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2); + + replay(clusterCommunicator); + + clockService = new SequentialClockService<>(); + + KryoNamespace.Builder serializer = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(TestTimestamp.class); + + ecMap = new EventuallyConsistentMapBuilderImpl<String, String>( + clusterService, clusterCommunicator) + .withName(MAP_NAME) + .withSerializer(serializer) + .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v)) + .withCommunicationExecutor(MoreExecutors.newDirectExecutorService()) + .build(); + + // Reset ready for tests to add their own expectations + reset(clusterCommunicator); + } + + @After + public void tearDown() { + reset(clusterCommunicator); + ecMap.destroy(); + } + + @SuppressWarnings("unchecked") + private EventuallyConsistentMapListener<String, String> getListener() { + return createMock(EventuallyConsistentMapListener.class); + } + + @Test + public void testSize() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertEquals(0, ecMap.size()); + ecMap.put(KEY1, VALUE1); + assertEquals(1, ecMap.size()); + ecMap.put(KEY1, VALUE2); + assertEquals(1, ecMap.size()); + ecMap.put(KEY2, VALUE2); + assertEquals(2, ecMap.size()); + for (int i = 0; i < 10; i++) { + ecMap.put("" + i, "" + i); + } + assertEquals(12, ecMap.size()); + ecMap.remove(KEY1); + assertEquals(11, ecMap.size()); + ecMap.remove(KEY1); + assertEquals(11, ecMap.size()); + } + + @Test + public void testIsEmpty() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertTrue(ecMap.isEmpty()); + ecMap.put(KEY1, VALUE1); + assertFalse(ecMap.isEmpty()); + ecMap.remove(KEY1); + assertTrue(ecMap.isEmpty()); + } + + @Test + public void testContainsKey() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertFalse(ecMap.containsKey(KEY1)); + ecMap.put(KEY1, VALUE1); + assertTrue(ecMap.containsKey(KEY1)); + assertFalse(ecMap.containsKey(KEY2)); + ecMap.remove(KEY1); + assertFalse(ecMap.containsKey(KEY1)); + } + + @Test + public void testContainsValue() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertFalse(ecMap.containsValue(VALUE1)); + ecMap.put(KEY1, VALUE1); + assertTrue(ecMap.containsValue(VALUE1)); + assertFalse(ecMap.containsValue(VALUE2)); + ecMap.put(KEY1, VALUE2); + assertFalse(ecMap.containsValue(VALUE1)); + assertTrue(ecMap.containsValue(VALUE2)); + ecMap.remove(KEY1); + assertFalse(ecMap.containsValue(VALUE2)); + } + + @Test + public void testGet() throws Exception { + expectPeerMessage(clusterCommunicator); + + CountDownLatch latch; + + // Local put + assertNull(ecMap.get(KEY1)); + ecMap.put(KEY1, VALUE1); + assertEquals(VALUE1, ecMap.get(KEY1)); + + // Remote put + List<UpdateEntry<String, String>> message + = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2))); + + // Create a latch so we know when the put operation has finished + latch = new CountDownLatch(1); + ecMap.addListener(new TestListener(latch)); + + assertNull(ecMap.get(KEY2)); + updateHandler.accept(message); + assertTrue("External listener never got notified of internal event", + latch.await(100, TimeUnit.MILLISECONDS)); + assertEquals(VALUE2, ecMap.get(KEY2)); + + // Local remove + ecMap.remove(KEY2); + assertNull(ecMap.get(KEY2)); + + // Remote remove + message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1))); + + // Create a latch so we know when the remove operation has finished + latch = new CountDownLatch(1); + ecMap.addListener(new TestListener(latch)); + + updateHandler.accept(message); + assertTrue("External listener never got notified of internal event", + latch.await(100, TimeUnit.MILLISECONDS)); + assertNull(ecMap.get(KEY1)); + } + + @Test + public void testPut() throws Exception { + // Set up expectations of external events to be sent to listeners during + // the test. These don't use timestamps so we can set them all up at once. + EventuallyConsistentMapListener<String, String> listener + = getListener(); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE2)); + replay(listener); + + ecMap.addListener(listener); + + // Set up expected internal message to be broadcast to peers on first put + expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService + .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + // Put first value + assertNull(ecMap.get(KEY1)); + ecMap.put(KEY1, VALUE1); + assertEquals(VALUE1, ecMap.get(KEY1)); + + verify(clusterCommunicator); + + // Set up expected internal message to be broadcast to peers on second put + expectSpecificMulticastMessage(generatePutMessage( + KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + // Update same key to a new value + ecMap.put(KEY1, VALUE2); + assertEquals(VALUE2, ecMap.get(KEY1)); + + verify(clusterCommunicator); + + // Do a put with a older timestamp than the value already there. + // The map data should not be changed and no notifications should be sent. + reset(clusterCommunicator); + replay(clusterCommunicator); + + clockService.turnBackTime(); + ecMap.put(KEY1, VALUE1); + // Value should not have changed. + assertEquals(VALUE2, ecMap.get(KEY1)); + + verify(clusterCommunicator); + + // Check that our listener received the correct events during the test + verify(listener); + } + + @Test + public void testRemove() throws Exception { + // Set up expectations of external events to be sent to listeners during + // the test. These don't use timestamps so we can set them all up at once. + EventuallyConsistentMapListener<String, String> listener + = getListener(); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2)); + replay(listener); + + ecMap.addListener(listener); + + // Put in an initial value + expectPeerMessage(clusterCommunicator); + ecMap.put(KEY1, VALUE1); + assertEquals(VALUE1, ecMap.get(KEY1)); + + // Remove the value and check the correct internal cluster messages + // are sent + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), + UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + ecMap.remove(KEY1); + assertNull(ecMap.get(KEY1)); + + verify(clusterCommunicator); + + // Remove the same value again. Even though the value is no longer in + // the map, we expect that the tombstone is updated and another remove + // event is sent to the cluster and external listeners. + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), + UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + ecMap.remove(KEY1); + assertNull(ecMap.get(KEY1)); + + verify(clusterCommunicator); + + + // Put in a new value for us to try and remove + expectPeerMessage(clusterCommunicator); + + ecMap.put(KEY2, VALUE2); + + clockService.turnBackTime(); + + // Remove should have no effect, since it has an older timestamp than + // the put. Expect no notifications to be sent out + reset(clusterCommunicator); + replay(clusterCommunicator); + + ecMap.remove(KEY2); + + verify(clusterCommunicator); + + // Check that our listener received the correct events during the test + verify(listener); + } + + @Test + public void testCompute() throws Exception { + // Set up expectations of external events to be sent to listeners during + // the test. These don't use timestamps so we can set them all up at once. + EventuallyConsistentMapListener<String, String> listener + = getListener(); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2)); + replay(listener); + + ecMap.addListener(listener); + + // Put in an initial value + expectPeerMessage(clusterCommunicator); + ecMap.compute(KEY1, (k, v) -> VALUE1); + assertEquals(VALUE1, ecMap.get(KEY1)); + + // Remove the value and check the correct internal cluster messages + // are sent + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), + UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + ecMap.compute(KEY1, (k, v) -> null); + assertNull(ecMap.get(KEY1)); + + verify(clusterCommunicator); + + // Remove the same value again. Even though the value is no longer in + // the map, we expect that the tombstone is updated and another remove + // event is sent to the cluster and external listeners. + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), + UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + ecMap.compute(KEY1, (k, v) -> null); + assertNull(ecMap.get(KEY1)); + + verify(clusterCommunicator); + + // Put in a new value for us to try and remove + expectPeerMessage(clusterCommunicator); + + ecMap.compute(KEY2, (k, v) -> VALUE2); + + clockService.turnBackTime(); + + // Remove should have no effect, since it has an older timestamp than + // the put. Expect no notifications to be sent out + reset(clusterCommunicator); + replay(clusterCommunicator); + + ecMap.compute(KEY2, (k, v) -> null); + + verify(clusterCommunicator); + + // Check that our listener received the correct events during the test + verify(listener); + } + + @Test + public void testPutAll() throws Exception { + // putAll() with an empty map is a no-op - no messages will be sent + reset(clusterCommunicator); + replay(clusterCommunicator); + + ecMap.putAll(new HashMap<>()); + + verify(clusterCommunicator); + + // Set up the listener with our expected events + EventuallyConsistentMapListener<String, String> listener + = getListener(); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2)); + replay(listener); + + ecMap.addListener(listener); + + // Expect a multi-update inter-instance message + expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT, + clusterCommunicator); + + Map<String, String> putAllValues = new HashMap<>(); + putAllValues.put(KEY1, VALUE1); + putAllValues.put(KEY2, VALUE2); + + // Put the values in the map + ecMap.putAll(putAllValues); + + // Check the correct messages and events were sent + verify(clusterCommunicator); + verify(listener); + } + + @Test + public void testClear() throws Exception { + EventuallyConsistentMapListener<String, String> listener + = getListener(); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1)); + listener.event(new EventuallyConsistentMapEvent<>( + MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2)); + replay(listener); + + // clear() on an empty map is a no-op - no messages will be sent + reset(clusterCommunicator); + replay(clusterCommunicator); + + assertTrue(ecMap.isEmpty()); + ecMap.clear(); + verify(clusterCommunicator); + + // Put some items in the map + expectPeerMessage(clusterCommunicator); + ecMap.put(KEY1, VALUE1); + ecMap.put(KEY2, VALUE2); + + ecMap.addListener(listener); + expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator); + + ecMap.clear(); + + verify(clusterCommunicator); + verify(listener); + } + + @Test + public void testKeySet() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertTrue(ecMap.keySet().isEmpty()); + + // Generate some keys + Set<String> keys = new HashSet<>(); + for (int i = 1; i <= 10; i++) { + keys.add("" + i); + } + + // Put each key in the map + keys.forEach(k -> ecMap.put(k, "value" + k)); + + // Check keySet() returns the correct value + assertEquals(keys, ecMap.keySet()); + + // Update the value for one of the keys + ecMap.put(keys.iterator().next(), "new-value"); + + // Check the key set is still the same + assertEquals(keys, ecMap.keySet()); + + // Remove a key + String removeKey = keys.iterator().next(); + keys.remove(removeKey); + ecMap.remove(removeKey); + + // Check the key set is still correct + assertEquals(keys, ecMap.keySet()); + } + + @Test + public void testValues() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertTrue(ecMap.values().isEmpty()); + + // Generate some values + Map<String, String> expectedValues = new HashMap<>(); + for (int i = 1; i <= 10; i++) { + expectedValues.put("" + i, "value" + i); + } + + // Add them into the map + expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue())); + + // Check the values collection is correct + assertEquals(expectedValues.values().size(), ecMap.values().size()); + expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v))); + + // Update the value for one of the keys + Map.Entry<String, String> first = expectedValues.entrySet().iterator().next(); + expectedValues.put(first.getKey(), "new-value"); + ecMap.put(first.getKey(), "new-value"); + + // Check the values collection is still correct + assertEquals(expectedValues.values().size(), ecMap.values().size()); + expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v))); + + // Remove a key + String removeKey = expectedValues.keySet().iterator().next(); + expectedValues.remove(removeKey); + ecMap.remove(removeKey); + + // Check the values collection is still correct + assertEquals(expectedValues.values().size(), ecMap.values().size()); + expectedValues.values().forEach(v -> assertTrue(ecMap.values().contains(v))); + } + + @Test + public void testEntrySet() throws Exception { + expectPeerMessage(clusterCommunicator); + + assertTrue(ecMap.entrySet().isEmpty()); + + // Generate some values + Map<String, String> expectedValues = new HashMap<>(); + for (int i = 1; i <= 10; i++) { + expectedValues.put("" + i, "value" + i); + } + + // Add them into the map + expectedValues.entrySet().forEach(e -> ecMap.put(e.getKey(), e.getValue())); + + // Check the entry set is correct + assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet())); + + // Update the value for one of the keys + Map.Entry<String, String> first = expectedValues.entrySet().iterator().next(); + expectedValues.put(first.getKey(), "new-value"); + ecMap.put(first.getKey(), "new-value"); + + // Check the entry set is still correct + assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet())); + + // Remove a key + String removeKey = expectedValues.keySet().iterator().next(); + expectedValues.remove(removeKey); + ecMap.remove(removeKey); + + // Check the entry set is still correct + assertTrue(entrySetsAreEqual(expectedValues, ecMap.entrySet())); + } + + private static boolean entrySetsAreEqual(Map<String, String> expectedMap, Set<Map.Entry<String, String>> actual) { + if (expectedMap.entrySet().size() != actual.size()) { + return false; + } + + for (Map.Entry<String, String> e : actual) { + if (!expectedMap.containsKey(e.getKey())) { + return false; + } + if (!Objects.equals(expectedMap.get(e.getKey()), e.getValue())) { + return false; + } + } + return true; + } + + @Test + public void testDestroy() throws Exception { + clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT); + clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT); + + replay(clusterCommunicator); + + ecMap.destroy(); + + verify(clusterCommunicator); + + try { + ecMap.get(KEY1); + fail("get after destroy should throw exception"); + } catch (IllegalStateException e) { + assertTrue(true); + } + + try { + ecMap.put(KEY1, VALUE1); + fail("put after destroy should throw exception"); + } catch (IllegalStateException e) { + assertTrue(true); + } + } + + private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) { + return new UpdateEntry<>(key, new MapValue<>(value, timestamp)); + } + + private List<UpdateEntry<String, String>> generatePutMessage( + String key1, String value1, String key2, String value2) { + List<UpdateEntry<String, String>> list = new ArrayList<>(); + + Timestamp timestamp1 = clockService.peek(1); + Timestamp timestamp2 = clockService.peek(2); + + list.add(generatePutMessage(key1, value1, timestamp1)); + list.add(generatePutMessage(key2, value2, timestamp2)); + + return list; + } + + private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) { + return new UpdateEntry<>(key, new MapValue<>(null, timestamp)); + } + + private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) { + List<UpdateEntry<String, String>> list = new ArrayList<>(); + + Timestamp timestamp1 = clockService.peek(1); + Timestamp timestamp2 = clockService.peek(2); + + list.add(generateRemoveMessage(key1, timestamp1)); + list.add(generateRemoveMessage(key2, timestamp2)); + + return list; + } + + /** + * Sets up a mock ClusterCommunicationService to expect a specific cluster + * message to be broadcast to the cluster. + * + * @param message message we expect to be sent + * @param clusterCommunicator a mock ClusterCommunicationService to set up + */ + //FIXME rename + private static <T> void expectSpecificBroadcastMessage( + T message, + MessageSubject subject, + ClusterCommunicationService clusterCommunicator) { + reset(clusterCommunicator); + clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class)); + expectLastCall().anyTimes(); + replay(clusterCommunicator); + } + + /** + * Sets up a mock ClusterCommunicationService to expect a specific cluster + * message to be multicast to the cluster. + * + * @param message message we expect to be sent + * @param subject subject we expect to be sent to + * @param clusterCommunicator a mock ClusterCommunicationService to set up + */ + //FIXME rename + private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject, + ClusterCommunicationService clusterCommunicator) { + reset(clusterCommunicator); + clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class)); + expectLastCall().anyTimes(); + replay(clusterCommunicator); + } + + + /** + * Sets up a mock ClusterCommunicationService to expect a multicast cluster message + * that is sent to it. This is useful for unit tests where we aren't + * interested in testing the messaging component. + * + * @param clusterCommunicator a mock ClusterCommunicationService to set up + */ + //FIXME rename + private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) { + reset(clusterCommunicator); +// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class), +// anyObject(Iterable.class))) + expect(clusterCommunicator.<T>unicast( + anyObject(), + anyObject(MessageSubject.class), + anyObject(Function.class), + anyObject(NodeId.class))) + .andReturn(CompletableFuture.completedFuture(null)) + .anyTimes(); + replay(clusterCommunicator); + } + + /** + * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message + * that is sent to it. This is useful for unit tests where we aren't + * interested in testing the messaging component. + * + * @param clusterCommunicator a mock ClusterCommunicationService to set up + */ + private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) { + reset(clusterCommunicator); + clusterCommunicator.<AbstractEvent>multicast( + anyObject(AbstractEvent.class), + anyObject(MessageSubject.class), + anyObject(Function.class), + anyObject(Set.class)); + expectLastCall().anyTimes(); + replay(clusterCommunicator); + } + + /** + * ClusterCommunicationService implementation that the map's addSubscriber + * call will delegate to. This means we can get a reference to the + * internal cluster message handler used by the map, so that we can simulate + * events coming in from other instances. + */ + private final class TestClusterCommunicationService + extends ClusterCommunicationServiceAdapter { + + @Override + public <M> void addSubscriber(MessageSubject subject, + Function<byte[], M> decoder, Consumer<M> handler, + Executor executor) { + if (subject.equals(UPDATE_MESSAGE_SUBJECT)) { + updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler; + } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) { + antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler; + } else { + throw new RuntimeException("Unexpected message subject " + subject.toString()); + } + } + } + + /** + * ClockService implementation that gives out timestamps based on a + * sequential counter. This clock service enables more control over the + * timestamps that are given out, including being able to "turn back time" + * to give out timestamps from the past. + * + * @param <T> Type that the clock service will give out timestamps for + * @param <U> Second type that the clock service will give out values for + */ + private class SequentialClockService<T, U> { + + private static final long INITIAL_VALUE = 1; + private final AtomicLong counter = new AtomicLong(INITIAL_VALUE); + + public Timestamp getTimestamp(T object, U object2) { + return new TestTimestamp(counter.getAndIncrement()); + } + + /** + * Returns what the next timestamp will be without consuming the + * timestamp. This allows test code to set expectations correctly while + * still allowing the CUT to get the same timestamp. + * + * @return timestamp equal to the timestamp that will be returned by the + * next call to {@link #getTimestamp(T, U)}. + */ + public Timestamp peekAtNextTimestamp() { + return peek(1); + } + + /** + * Returns the ith timestamp to be given out in the future without + * consuming the timestamp. For example, i=1 returns the next timestamp, + * i=2 returns the timestamp after that, and so on. + * + * @param i number of the timestamp to peek at + * @return the ith timestamp that will be given out + */ + public Timestamp peek(int i) { + checkArgument(i > 0, "i must be a positive integer"); + + return new TestTimestamp(counter.get() + i - 1); + } + + /** + * Turns the clock back two ticks, so the next call to getTimestamp will + * return an older timestamp than the previous call to getTimestamp. + */ + public void turnBackTime() { + // Not atomic, but should be OK for these tests. + counter.decrementAndGet(); + counter.decrementAndGet(); + } + + } + + /** + * Timestamp implementation where the value of the timestamp can be + * specified explicitly at creation time. + */ + private class TestTimestamp implements Timestamp { + + private final long timestamp; + + /** + * Creates a new timestamp that has the specified value. + * + * @param timestamp value of the timestamp + */ + public TestTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public int compareTo(Timestamp o) { + checkArgument(o instanceof TestTimestamp); + TestTimestamp otherTimestamp = (TestTimestamp) o; + return ComparisonChain.start() + .compare(this.timestamp, otherTimestamp.timestamp) + .result(); + } + } + + /** + * EventuallyConsistentMapListener implementation which triggers a latch + * when it receives an event. + */ + private class TestListener implements EventuallyConsistentMapListener<String, String> { + private CountDownLatch latch; + + /** + * Creates a new listener that will trigger the specified latch when it + * receives and event. + * + * @param latch the latch to trigger on events + */ + public TestListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void event(EventuallyConsistentMapEvent<String, String> event) { + latch.countDown(); + } + } +} |