From bf887b0ecebf65746684691a0cd4d448ad8606f1 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 22 Aug 2018 13:05:08 +0200 Subject: [PATCH] Cleanup TestKit use Do not subclass TestKit, but instantiate it as a common resource. Also use static imports for mocking/asserts. This makes the tests cleaner and less verbose. Change-Id: I18d8765c8fb5019c9f4aa94bc9474277443a3097 Signed-off-by: Robert Varga --- .../DataTreeChangeListenerActorTest.java | 172 +- .../DataTreeChangeListenerProxyTest.java | 266 ++- .../datastore/RoleChangeNotifierTest.java | 119 +- .../datastore/ShardTransactionTest.java | 423 ++-- ...ficationListenerRegistrationActorTest.java | 98 +- .../actors/ShardSnapshotActorTest.java | 47 +- .../shardmanager/ShardManagerTest.java | 1757 ++++++++--------- .../datastore/utils/ActorContextTest.java | 181 +- .../sharding/RoleChangeListenerActorTest.java | 17 +- .../controller/remote/rpc/RpcBrokerTest.java | 47 +- .../remote/rpc/RpcListenerTest.java | 40 +- 11 files changed, 1435 insertions(+), 1732 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java index 627e434e5b..5db25428bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActorTest.java @@ -7,6 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH; import akka.actor.ActorRef; @@ -14,10 +20,8 @@ import akka.actor.DeadLetter; import akka.actor.Props; import akka.testkit.javadsl.TestKit; import com.google.common.collect.ImmutableList; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; @@ -25,119 +29,107 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; public class DataTreeChangeListenerActorTest extends AbstractActorTest { + private TestKit testKit; + + @Before + public void before() { + testKit = new TestKit(getSystem()); + } @Test public void testDataChangedWhenNotificationsAreEnabled() { - new TestKit(getSystem()) { - { - final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); - final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); - final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); - final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled"); + final DataTreeCandidate mockTreeCandidate = mock(DataTreeCandidate.class); + final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); + final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); + final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsEnabled"); - // Let the DataChangeListener know that notifications should be - // enabled - subject.tell(new EnableNotification(true, "test"), getRef()); + // Let the DataChangeListener know that notifications should be + // enabled + subject.tell(new EnableNotification(true, "test"), testKit.getRef()); - subject.tell(new DataTreeChanged(mockCandidates), getRef()); + subject.tell(new DataTreeChanged(mockCandidates), testKit.getRef()); - expectMsgClass(DataTreeChangedReply.class); + testKit.expectMsgClass(DataTreeChangedReply.class); - Mockito.verify(mockListener).onDataTreeChanged(mockCandidates); - } - }; + verify(mockListener).onDataTreeChanged(mockCandidates); } @Test public void testDataChangedWhenNotificationsAreDisabled() { - new TestKit(getSystem()) { - { - final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); - final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); - final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); - final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled"); - - subject.tell(new DataTreeChanged(mockCandidates), getRef()); - - within(duration("1 seconds"), () -> { - expectNoMessage(); - Mockito.verify(mockListener, Mockito.never()) - .onDataTreeChanged(Matchers.anyCollectionOf(DataTreeCandidate.class)); - return null; - }); - } - }; + final DataTreeCandidate mockTreeCandidate = mock(DataTreeCandidate.class); + final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); + final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); + final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedNotificationsDisabled"); + + subject.tell(new DataTreeChanged(mockCandidates), testKit.getRef()); + + testKit.within(testKit.duration("1 seconds"), () -> { + testKit.expectNoMessage(); + verify(mockListener, never()).onDataTreeChanged(anyCollection()); + return null; + }); } @Test public void testDataChangedWithNoSender() { - new TestKit(getSystem()) { - { - final DataTreeCandidate mockTreeCandidate = Mockito.mock(DataTreeCandidate.class); - final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); - final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); - final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender"); - - getSystem().eventStream().subscribe(getRef(), DeadLetter.class); - - subject.tell(new DataTreeChanged(mockCandidates), ActorRef.noSender()); - - // Make sure no DataChangedReply is sent to DeadLetters. - while (true) { - DeadLetter deadLetter; - try { - deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class); - } catch (AssertionError e) { - // Timed out - got no DeadLetter - this is good - break; - } - - // We may get DeadLetters for other messages we don't care - // about. - Assert.assertFalse("Unexpected DataTreeChangedReply", - deadLetter.message() instanceof DataTreeChangedReply); - } + final DataTreeCandidate mockTreeCandidate = mock(DataTreeCandidate.class); + final ImmutableList mockCandidates = ImmutableList.of(mockTreeCandidate); + final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class); + final Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); + final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithNoSender"); + + getSystem().eventStream().subscribe(testKit.getRef(), DeadLetter.class); + + subject.tell(new DataTreeChanged(mockCandidates), ActorRef.noSender()); + + // Make sure no DataChangedReply is sent to DeadLetters. + while (true) { + DeadLetter deadLetter; + try { + deadLetter = testKit.expectMsgClass(testKit.duration("1 seconds"), DeadLetter.class); + } catch (AssertionError e) { + // Timed out - got no DeadLetter - this is good + break; } - }; + + // We may get DeadLetters for other messages we don't care + // about. + assertFalse("Unexpected DataTreeChangedReply", deadLetter.message() instanceof DataTreeChangedReply); + } } @Test public void testDataChangedWithListenerRuntimeEx() { - new TestKit(getSystem()) { - { - final DataTreeCandidate mockTreeCandidate1 = Mockito.mock(DataTreeCandidate.class); - final ImmutableList mockCandidates1 = ImmutableList.of(mockTreeCandidate1); - final DataTreeCandidate mockTreeCandidate2 = Mockito.mock(DataTreeCandidate.class); - final ImmutableList mockCandidates2 = ImmutableList.of(mockTreeCandidate2); - final DataTreeCandidate mockTreeCandidate3 = Mockito.mock(DataTreeCandidate.class); - final ImmutableList mockCandidates3 = ImmutableList.of(mockTreeCandidate3); + final DataTreeCandidate mockTreeCandidate1 = mock(DataTreeCandidate.class); + final ImmutableList mockCandidates1 = ImmutableList.of(mockTreeCandidate1); + final DataTreeCandidate mockTreeCandidate2 = mock(DataTreeCandidate.class); + final ImmutableList mockCandidates2 = ImmutableList.of(mockTreeCandidate2); + final DataTreeCandidate mockTreeCandidate3 = mock(DataTreeCandidate.class); + final ImmutableList mockCandidates3 = ImmutableList.of(mockTreeCandidate3); - final DOMDataTreeChangeListener mockListener = Mockito.mock(DOMDataTreeChangeListener.class); - Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2); + final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class); + doThrow(new RuntimeException("mock")).when(mockListener).onDataTreeChanged(mockCandidates2); - Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); - ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx"); + Props props = DataTreeChangeListenerActor.props(mockListener, TEST_PATH); + ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedWithListenerRuntimeEx"); - // Let the DataChangeListener know that notifications should be - // enabled - subject.tell(new EnableNotification(true, "test"), getRef()); + // Let the DataChangeListener know that notifications should be + // enabled + subject.tell(new EnableNotification(true, "test"), testKit.getRef()); - subject.tell(new DataTreeChanged(mockCandidates1), getRef()); - expectMsgClass(DataTreeChangedReply.class); + subject.tell(new DataTreeChanged(mockCandidates1), testKit.getRef()); + testKit.expectMsgClass(DataTreeChangedReply.class); - subject.tell(new DataTreeChanged(mockCandidates2), getRef()); - expectMsgClass(DataTreeChangedReply.class); + subject.tell(new DataTreeChanged(mockCandidates2), testKit.getRef()); + testKit.expectMsgClass(DataTreeChangedReply.class); - subject.tell(new DataTreeChanged(mockCandidates3), getRef()); - expectMsgClass(DataTreeChangedReply.class); + subject.tell(new DataTreeChanged(mockCandidates3), testKit.getRef()); + testKit.expectMsgClass(DataTreeChangedReply.class); - Mockito.verify(mockListener).onDataTreeChanged(mockCandidates1); - Mockito.verify(mockListener).onDataTreeChanged(mockCandidates2); - Mockito.verify(mockListener).onDataTreeChanged(mockCandidates3); - } - }; + verify(mockListener).onDataTreeChanged(mockCandidates1); + verify(mockListener).onDataTreeChanged(mockCandidates2); + verify(mockListener).onDataTreeChanged(mockCandidates3); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java index aac23e725b..545b976ee8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -24,7 +25,6 @@ import akka.util.Timeout; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; -import org.junit.Assert; import org.junit.Test; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.common.actor.Dispatchers; @@ -51,211 +51,193 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { @Test(timeout = 10000) public void testSuccessfulRegistration() { - new TestKit(getSystem()) { - { - ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), - mock(Configuration.class)); + final TestKit kit = new TestKit(getSystem()); + ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener, path); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, path); - new Thread(() -> proxy.init("shard-1")).start(); + new Thread(() -> proxy.init("shard-1")).start(); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + FiniteDuration timeout = kit.duration("5 seconds"); + FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new LocalShardFound(getRef())); + kit.reply(new LocalShardFound(kit.getRef())); - RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, - RegisterDataTreeChangeListener.class); - Assert.assertEquals("getPath", path, registerMsg.getPath()); - Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances()); + RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout, + RegisterDataTreeChangeListener.class); + assertEquals("getPath", path, registerMsg.getPath()); + assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances()); - reply(new RegisterDataTreeNotificationListenerReply(getRef())); + kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef())); - for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } + for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } - Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), - proxy.getListenerRegistrationActor()); + assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()), + proxy.getListenerRegistrationActor()); - watch(proxy.getDataChangeListenerActor()); + kit.watch(proxy.getDataChangeListenerActor()); - proxy.close(); + proxy.close(); - // The listener registration actor should get a Close message - expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); + // The listener registration actor should get a Close message + kit.expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); - // The DataChangeListener actor should be terminated - expectMsgClass(timeout, Terminated.class); + // The DataChangeListener actor should be terminated + kit.expectMsgClass(timeout, Terminated.class); - proxy.close(); + proxy.close(); - expectNoMessage(); - } - }; + kit.expectNoMessage(); } @Test(timeout = 10000) public void testSuccessfulRegistrationForClusteredListener() { - new TestKit(getSystem()) { - { - ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), - mock(Configuration.class)); + final TestKit kit = new TestKit(getSystem()); + ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + mock(Configuration.class)); - ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( - ClusteredDOMDataTreeChangeListener.class); + ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( + ClusteredDOMDataTreeChangeListener.class); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataTreeChangeListenerProxy proxy = + new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path); - new Thread(() -> proxy.init("shard-1")).start(); + new Thread(() -> proxy.init("shard-1")).start(); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + FiniteDuration timeout = kit.duration("5 seconds"); + FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new LocalShardFound(getRef())); + kit.reply(new LocalShardFound(kit.getRef())); - RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, - RegisterDataTreeChangeListener.class); - Assert.assertEquals("getPath", path, registerMsg.getPath()); - Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); + RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout, + RegisterDataTreeChangeListener.class); + assertEquals("getPath", path, registerMsg.getPath()); + assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); - proxy.close(); - } - }; + proxy.close(); } @Test(timeout = 10000) public void testLocalShardNotFound() { - new TestKit(getSystem()) { - { - ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), - mock(Configuration.class)); + final TestKit kit = new TestKit(getSystem()); + ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener, path); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, path); - new Thread(() -> proxy.init("shard-1")).start(); + new Thread(() -> proxy.init("shard-1")).start(); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + FiniteDuration timeout = kit.duration("5 seconds"); + FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new LocalShardNotFound("shard-1")); + kit.reply(new LocalShardNotFound("shard-1")); - expectNoMessage(duration("1 seconds")); + kit.expectNoMessage(kit.duration("1 seconds")); - proxy.close(); - } - }; + proxy.close(); } @Test(timeout = 10000) public void testLocalShardNotInitialized() { - new TestKit(getSystem()) { - { - ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), - mock(Configuration.class)); + final TestKit kit = new TestKit(getSystem()); + ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener, path); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, path); - new Thread(() -> proxy.init("shard-1")).start(); + new Thread(() -> proxy.init("shard-1")).start(); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + FiniteDuration timeout = kit.duration("5 seconds"); + FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new NotInitializedException("not initialized")); + kit.reply(new NotInitializedException("not initialized")); - within(duration("1 seconds"), () -> { - expectNoMessage(); - return null; - }); + kit.within(kit.duration("1 seconds"), () -> { + kit.expectNoMessage(); + return null; + }); - proxy.close(); - } - }; + proxy.close(); } @Test public void testFailedRegistration() { - new TestKit(getSystem()) { - { - ActorSystem mockActorSystem = mock(ActorSystem.class); + final TestKit kit = new TestKit(getSystem()); + ActorSystem mockActorSystem = mock(ActorSystem.class); - ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration"); - doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); - ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); + ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration"); + doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); + ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); - ActorContext actorContext = mock(ActorContext.class); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + ActorContext actorContext = mock(ActorContext.class); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - doReturn(executor).when(actorContext).getClientDispatcher(); - doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); - doReturn(mockActorSystem).when(actorContext).getActorSystem(); + doReturn(executor).when(actorContext).getClientDispatcher(); + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + doReturn(mockActorSystem).when(actorContext).getActorSystem(); - String shardName = "shard-1"; - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener, path); + String shardName = "shard-1"; + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, path); - doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); - doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); - doReturn(Futures.failed(new RuntimeException("mock"))).when(actorContext) - .executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); - doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); + doReturn(kit.duration("5 seconds")).when(actorContext).getOperationDuration(); + doReturn(Futures.successful(kit.getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); + doReturn(Futures.failed(new RuntimeException("mock"))).when(actorContext).executeOperationAsync( + any(ActorRef.class), any(Object.class), any(Timeout.class)); + doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); - proxy.init("shard-1"); + proxy.init("shard-1"); - Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); + assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); - proxy.close(); - } - }; + proxy.close(); } @Test public void testCloseBeforeRegistration() { - new TestKit(getSystem()) { - { - ActorContext actorContext = mock(ActorContext.class); - - String shardName = "shard-1"; - - doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); - doReturn(getSystem()).when(actorContext).getActorSystem(); - doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath(); - doReturn(getSystem().actorSelection(getRef().path())).when(actorContext) - .actorSelection(getRef().path()); - doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); - doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); - - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); - - Answer> answer = invocation -> { - proxy.close(); - return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(getRef())); - }; + final TestKit kit = new TestKit(getSystem()); + ActorContext actorContext = mock(ActorContext.class); + + String shardName = "shard-1"; + + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); + doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath(); + doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorContext).actorSelection( + kit.getRef().path()); + doReturn(kit.duration("5 seconds")).when(actorContext).getOperationDuration(); + doReturn(Futures.successful(kit.getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); + + final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( + actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); + + Answer> answer = invocation -> { + proxy.close(); + return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef())); + }; - doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class), - any(Timeout.class)); + doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class), + any(Timeout.class)); - proxy.init(shardName); + proxy.init(shardName); - expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class); + kit.expectMsgClass(kit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class); - Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); - } - }; + assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java index bbcedd6811..be2ba8ce71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.assertNull; import akka.actor.ActorRef; import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -26,95 +27,87 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; public class RoleChangeNotifierTest extends AbstractActorTest { + private TestKit testKit; + + @Before + public void setup() { + testKit = new TestKit(getSystem()); + } @Test public void testHandleRegisterRoleChangeListener() { - new TestKit(getSystem()) { - { - String memberId = "testHandleRegisterRoleChangeListener"; - ActorRef listenerActor = getSystem().actorOf(MessageCollectorActor.props()); + String memberId = "testHandleRegisterRoleChangeListener"; + ActorRef listenerActor = getSystem().actorOf(MessageCollectorActor.props()); - TestActorRef notifierTestActorRef = TestActorRef.create(getSystem(), - RoleChangeNotifier.getProps(memberId), memberId); + TestActorRef notifierTestActorRef = TestActorRef.create(getSystem(), + RoleChangeNotifier.getProps(memberId), memberId); - notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); + notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); - RegisterRoleChangeListenerReply reply = MessageCollectorActor.getFirstMatching(listenerActor, - RegisterRoleChangeListenerReply.class); - assertNotNull(reply); + RegisterRoleChangeListenerReply reply = MessageCollectorActor.getFirstMatching(listenerActor, + RegisterRoleChangeListenerReply.class); + assertNotNull(reply); - RoleChangeNotification notification = MessageCollectorActor.getFirstMatching(listenerActor, - RoleChangeNotification.class); - assertNull(notification); - } - }; + RoleChangeNotification notification = MessageCollectorActor.getFirstMatching(listenerActor, + RoleChangeNotification.class); + assertNull(notification); } @Test public void testHandleRaftRoleChanged() { - new TestKit(getSystem()) { - { - String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet"; - ActorRef listenerActor = getSystem().actorOf(MessageCollectorActor.props()); - ActorRef shardActor = getTestActor(); - - TestActorRef notifierTestActorRef = TestActorRef.create(getSystem(), - RoleChangeNotifier.getProps(memberId), memberId); - - notifierTestActorRef.tell( - new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); + String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet"; + ActorRef listenerActor = getSystem().actorOf(MessageCollectorActor.props()); + ActorRef shardActor = testKit.getTestActor(); - // no notification should be sent as listener has not yet - // registered - assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class)); + TestActorRef notifierTestActorRef = TestActorRef.create(getSystem(), + RoleChangeNotifier.getProps(memberId), memberId); - // listener registers after role has been changed, ensure we - // sent the latest role change after a reply - notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); + notifierTestActorRef.tell( + new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); - RegisterRoleChangeListenerReply reply = MessageCollectorActor.getFirstMatching(listenerActor, - RegisterRoleChangeListenerReply.class); - assertNotNull(reply); + // no notification should be sent as listener has not yet + // registered + assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class)); - RoleChangeNotification notification = MessageCollectorActor.getFirstMatching(listenerActor, - RoleChangeNotification.class); - assertNotNull(notification); - assertEquals(RaftState.Candidate.name(), notification.getOldRole()); - assertEquals(RaftState.Leader.name(), notification.getNewRole()); + // listener registers after role has been changed, ensure we + // sent the latest role change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor); - } - }; + RegisterRoleChangeListenerReply reply = MessageCollectorActor.getFirstMatching(listenerActor, + RegisterRoleChangeListenerReply.class); + assertNotNull(reply); + RoleChangeNotification notification = MessageCollectorActor.getFirstMatching(listenerActor, + RoleChangeNotification.class); + assertNotNull(notification); + assertEquals(RaftState.Candidate.name(), notification.getOldRole()); + assertEquals(RaftState.Leader.name(), notification.getNewRole()); } @Test public void testHandleLeaderStateChanged() { - new TestKit(getSystem()) { - { - String actorId = "testHandleLeaderStateChanged"; - TestActorRef notifierTestActorRef = TestActorRef.create(getSystem(), - RoleChangeNotifier.getProps(actorId), actorId); + String actorId = "testHandleLeaderStateChanged"; + TestActorRef notifierTestActorRef = TestActorRef.create(getSystem(), + RoleChangeNotifier.getProps(actorId), actorId); - notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1", (short) 5), ActorRef.noSender()); + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1", (short) 5), ActorRef.noSender()); - // listener registers after the sate has been changed, ensure we - // sent the latest state change after a reply - notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef()); + // listener registers after the sate has been changed, ensure we + // sent the latest state change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), testKit.getRef()); - expectMsgClass(RegisterRoleChangeListenerReply.class); + testKit.expectMsgClass(RegisterRoleChangeListenerReply.class); - LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class); - assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); - assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId()); - assertEquals("getLeaderPayloadVersion", 5, leaderStateChanged.getLeaderPayloadVersion()); + LeaderStateChanged leaderStateChanged = testKit.expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId()); + assertEquals("getLeaderPayloadVersion", 5, leaderStateChanged.getLeaderPayloadVersion()); - notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2", (short) 6), ActorRef.noSender()); + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2", (short) 6), ActorRef.noSender()); - leaderStateChanged = expectMsgClass(LeaderStateChanged.class); - assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); - assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId()); - assertEquals("getLeaderPayloadVersion", 6, leaderStateChanged.getLeaderPayloadVersion()); - } - }; + leaderStateChanged = testKit.expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId()); + assertEquals("getLeaderPayloadVersion", 6, leaderStateChanged.getLeaderPayloadVersion()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index d1f47852c6..8f952acdd5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -5,14 +5,16 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.Props; @@ -26,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -68,6 +69,7 @@ public class ShardTransactionTest extends AbstractActorTest { private TestActorRef shard; private ShardDataTree store; + private TestKit testKit; @Before public void setUp() { @@ -76,6 +78,7 @@ public class ShardTransactionTest extends AbstractActorTest { .withDispatcher(Dispatchers.DefaultDispatcherId())); ShardTestKit.waitUntilLeader(shard); store = shard.underlyingActor().getDataStore(); + testKit = new TestKit(getSystem()); } private ActorRef newTransactionActor(final TransactionType type, @@ -95,315 +98,257 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadData() { - new TestKit(getSystem()) { - { - testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO")); - - testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW")); - } + testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO")); + testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW")); + } - private void testOnReceiveReadData(final ActorRef transaction) { - transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), - getRef()); + private void testOnReceiveReadData(final ActorRef transaction) { + transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), + testKit.getRef()); - ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class); + ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class); - assertNotNull(reply.getNormalizedNode()); - } - }; + assertNotNull(reply.getNormalizedNode()); } @Test public void testOnReceiveReadDataWhenDataNotFound() { - new TestKit(getSystem()) { - { - testOnReceiveReadDataWhenDataNotFound( - newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO")); - - testOnReceiveReadDataWhenDataNotFound( - newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW")); - } + testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(), + "testReadDataWhenDataNotFoundRO")); + testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(), + "testReadDataWhenDataNotFoundRW")); + } - private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) { - transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef()); + private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) { + transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef()); - ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class); + ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class); - assertTrue(reply.getNormalizedNode() == null); - } - }; + assertNull(reply.getNormalizedNode()); } @Test public void testOnReceiveDataExistsPositive() { - new TestKit(getSystem()) { - { - testOnReceiveDataExistsPositive( - newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO")); - - testOnReceiveDataExistsPositive( - newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW")); - } + testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO")); + testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW")); + } - private void testOnReceiveDataExistsPositive(final ActorRef transaction) { - transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), - getRef()); + private void testOnReceiveDataExistsPositive(final ActorRef transaction) { + transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), + testKit.getRef()); - DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class); + DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class); - assertTrue(reply.exists()); - } - }; + assertTrue(reply.exists()); } @Test public void testOnReceiveDataExistsNegative() { - new TestKit(getSystem()) { - { - testOnReceiveDataExistsNegative( - newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO")); - - testOnReceiveDataExistsNegative( - newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW")); - } + testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO")); + testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW")); + } - private void testOnReceiveDataExistsNegative(final ActorRef transaction) { - transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef()); + private void testOnReceiveDataExistsNegative(final ActorRef transaction) { + transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef()); - DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class); + DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class); - assertFalse(reply.exists()); - } - }; + assertFalse(reply.exists()); } @Test public void testOnReceiveBatchedModifications() { - new TestKit(getSystem()) { - { - ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class); - DataTreeModification mockModification = Mockito.mock(DataTreeModification.class); - ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, - nextTransactionId(), mockModification); - final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications"); - - YangInstanceIdentifier writePath = TestModel.TEST_PATH; - NormalizedNode writeData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - - YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH; - NormalizedNode mergeData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)) - .build(); - - YangInstanceIdentifier deletePath = TestModel.TEST_PATH; - - BatchedModifications batched = new BatchedModifications(nextTransactionId(), - DataStoreVersions.CURRENT_VERSION); - batched.addModification(new WriteModification(writePath, writeData)); - batched.addModification(new MergeModification(mergePath, mergeData)); - batched.addModification(new DeleteModification(deletePath)); - - transaction.tell(batched, getRef()); - - BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), - BatchedModificationsReply.class); - assertEquals("getNumBatched", 3, reply.getNumBatched()); - - InOrder inOrder = Mockito.inOrder(mockModification); - inOrder.verify(mockModification).write(writePath, writeData); - inOrder.verify(mockModification).merge(mergePath, mergeData); - inOrder.verify(mockModification).delete(deletePath); - } - }; + ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class); + DataTreeModification mockModification = mock(DataTreeModification.class); + ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, + nextTransactionId(), mockModification); + final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications"); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH; + NormalizedNode mergeData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)) + .build(); + + YangInstanceIdentifier deletePath = TestModel.TEST_PATH; + + BatchedModifications batched = new BatchedModifications(nextTransactionId(), + DataStoreVersions.CURRENT_VERSION); + batched.addModification(new WriteModification(writePath, writeData)); + batched.addModification(new MergeModification(mergePath, mergeData)); + batched.addModification(new DeleteModification(deletePath)); + + transaction.tell(batched, testKit.getRef()); + + BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), + BatchedModificationsReply.class); + assertEquals("getNumBatched", 3, reply.getNumBatched()); + + InOrder inOrder = inOrder(mockModification); + inOrder.verify(mockModification).write(writePath, writeData); + inOrder.verify(mockModification).merge(mergePath, mergeData); + inOrder.verify(mockModification).delete(deletePath); } @Test public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() { - new TestKit(getSystem()) { - { - final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), - "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit"); - - TestKit watcher = new TestKit(getSystem()); - watcher.watch(transaction); - - YangInstanceIdentifier writePath = TestModel.TEST_PATH; - NormalizedNode writeData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - - final TransactionIdentifier tx1 = nextTransactionId(); - BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); - batched.addModification(new WriteModification(writePath, writeData)); - - transaction.tell(batched, getRef()); - BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), - BatchedModificationsReply.class); - assertEquals("getNumBatched", 1, reply.getNumBatched()); - - batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); - batched.setReady(); - batched.setTotalMessagesSent(2); - - transaction.tell(batched, getRef()); - expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); - watcher.expectMsgClass(duration("5 seconds"), Terminated.class); - } - }; + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), + "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit"); + + TestKit watcher = new TestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + final TransactionIdentifier tx1 = nextTransactionId(); + BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); + batched.addModification(new WriteModification(writePath, writeData)); + + transaction.tell(batched, testKit.getRef()); + BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), + BatchedModificationsReply.class); + assertEquals("getNumBatched", 1, reply.getNumBatched()); + + batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); + batched.setReady(); + batched.setTotalMessagesSent(2); + + transaction.tell(batched, testKit.getRef()); + testKit.expectMsgClass(testKit.duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class); } @Test public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() { - new TestKit(getSystem()) { - { - final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), - "testOnReceiveBatchedModificationsReadyWithImmediateCommit"); - - TestKit watcher = new TestKit(getSystem()); - watcher.watch(transaction); - - YangInstanceIdentifier writePath = TestModel.TEST_PATH; - NormalizedNode writeData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - - BatchedModifications batched = new BatchedModifications(nextTransactionId(), - DataStoreVersions.CURRENT_VERSION); - batched.addModification(new WriteModification(writePath, writeData)); - batched.setReady(); - batched.setDoCommitOnReady(true); - batched.setTotalMessagesSent(1); - - transaction.tell(batched, getRef()); - expectMsgClass(duration("5 seconds"), CommitTransactionReply.class); - watcher.expectMsgClass(duration("5 seconds"), Terminated.class); - } - }; + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), + "testOnReceiveBatchedModificationsReadyWithImmediateCommit"); + + TestKit watcher = new TestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + BatchedModifications batched = new BatchedModifications(nextTransactionId(), + DataStoreVersions.CURRENT_VERSION); + batched.addModification(new WriteModification(writePath, writeData)); + batched.setReady(); + batched.setDoCommitOnReady(true); + batched.setTotalMessagesSent(1); + + transaction.tell(batched, testKit.getRef()); + testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class); + watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class); } @Test(expected = TestException.class) public void testOnReceiveBatchedModificationsFailure() throws Exception { - new TestKit(getSystem()) { - { + ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class); + DataTreeModification mockModification = mock(DataTreeModification.class); + ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, + nextTransactionId(), mockModification); + final ActorRef transaction = newTransactionActor(RW, mockWriteTx, + "testOnReceiveBatchedModificationsFailure"); - ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class); - DataTreeModification mockModification = Mockito.mock(DataTreeModification.class); - ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, - nextTransactionId(), mockModification); - final ActorRef transaction = newTransactionActor(RW, mockWriteTx, - "testOnReceiveBatchedModificationsFailure"); + TestKit watcher = new TestKit(getSystem()); + watcher.watch(transaction); - TestKit watcher = new TestKit(getSystem()); - watcher.watch(transaction); + YangInstanceIdentifier path = TestModel.TEST_PATH; + ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - YangInstanceIdentifier path = TestModel.TEST_PATH; - ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doThrow(new TestException()).when(mockModification).write(path, node); - doThrow(new TestException()).when(mockModification).write(path, node); + final TransactionIdentifier tx1 = nextTransactionId(); + BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); + batched.addModification(new WriteModification(path, node)); - final TransactionIdentifier tx1 = nextTransactionId(); - BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); - batched.addModification(new WriteModification(path, node)); + transaction.tell(batched, testKit.getRef()); + testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class); - transaction.tell(batched, getRef()); - expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); + batched.setReady(); + batched.setTotalMessagesSent(2); - batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); - batched.setReady(); - batched.setTotalMessagesSent(2); + transaction.tell(batched, testKit.getRef()); + Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class); - transaction.tell(batched, getRef()); - Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); - watcher.expectMsgClass(duration("5 seconds"), Terminated.class); - - if (failure != null) { - Throwables.propagateIfPossible(failure.cause(), Exception.class); - throw new RuntimeException(failure.cause()); - } - } - }; + if (failure != null) { + Throwables.propagateIfPossible(failure.cause(), Exception.class); + throw new RuntimeException(failure.cause()); + } } @Test(expected = IllegalStateException.class) public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception { - new TestKit(getSystem()) { - { - - final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), - "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), + "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); - TestKit watcher = new TestKit(getSystem()); - watcher.watch(transaction); + TestKit watcher = new TestKit(getSystem()); + watcher.watch(transaction); - BatchedModifications batched = new BatchedModifications(nextTransactionId(), - DataStoreVersions.CURRENT_VERSION); - batched.setReady(); - batched.setTotalMessagesSent(2); + BatchedModifications batched = new BatchedModifications(nextTransactionId(), + DataStoreVersions.CURRENT_VERSION); + batched.setReady(); + batched.setTotalMessagesSent(2); - transaction.tell(batched, getRef()); + transaction.tell(batched, testKit.getRef()); - Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); - watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class); + watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class); - if (failure != null) { - Throwables.throwIfInstanceOf(failure.cause(), Exception.class); - Throwables.throwIfUnchecked(failure.cause()); - throw new RuntimeException(failure.cause()); - } - } - }; + if (failure != null) { + Throwables.throwIfInstanceOf(failure.cause(), Exception.class); + Throwables.throwIfUnchecked(failure.cause()); + throw new RuntimeException(failure.cause()); + } } @Test public void testReadWriteTxOnReceiveCloseTransaction() { - new TestKit(getSystem()) { - { - final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), - "testReadWriteTxOnReceiveCloseTransaction"); + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), + "testReadWriteTxOnReceiveCloseTransaction"); - watch(transaction); + testKit.watch(transaction); - transaction.tell(new CloseTransaction().toSerializable(), getRef()); + transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef()); - expectMsgClass(duration("3 seconds"), CloseTransactionReply.class); - expectTerminated(duration("3 seconds"), transaction); - } - }; + testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class); + testKit.expectTerminated(testKit.duration("3 seconds"), transaction); } @Test public void testWriteOnlyTxOnReceiveCloseTransaction() { - new TestKit(getSystem()) { - { - final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), - "testWriteTxOnReceiveCloseTransaction"); + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), + "testWriteTxOnReceiveCloseTransaction"); - watch(transaction); + testKit.watch(transaction); - transaction.tell(new CloseTransaction().toSerializable(), getRef()); + transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef()); - expectMsgClass(duration("3 seconds"), CloseTransactionReply.class); - expectTerminated(duration("3 seconds"), transaction); - } - }; + testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class); + testKit.expectTerminated(testKit.duration("3 seconds"), transaction); } @Test public void testReadOnlyTxOnReceiveCloseTransaction() { - new TestKit(getSystem()) { - { - final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(), - "testReadOnlyTxOnReceiveCloseTransaction"); + final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(), + "testReadOnlyTxOnReceiveCloseTransaction"); - watch(transaction); + testKit.watch(transaction); - transaction.tell(new CloseTransaction().toSerializable(), getRef()); + transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef()); - expectMsgClass(duration("3 seconds"), Terminated.class); - } - }; + testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class); } @Test @@ -411,16 +356,12 @@ public class ShardTransactionTest extends AbstractActorTest { datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout( 500, TimeUnit.MILLISECONDS).build(); - new TestKit(getSystem()) { - { - final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), - "testShardTransactionInactivity"); + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), + "testShardTransactionInactivity"); - watch(transaction); + testKit.watch(transaction); - expectMsgClass(duration("3 seconds"), Terminated.class); - } - }; + testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class); } public static class TestException extends RuntimeException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java index f5d096bff8..397724b793 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/DataTreeNotificationListenerRegistrationActorTest.java @@ -7,14 +7,15 @@ */ package org.opendaylight.controller.cluster.datastore.actors; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.testkit.javadsl.TestKit; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; @@ -28,81 +29,72 @@ public class DataTreeNotificationListenerRegistrationActorTest extends AbstractA @Mock private Runnable mockOnClose; + private TestKit kit; + @Before public void setup() { MockitoAnnotations.initMocks(this); DataTreeNotificationListenerRegistrationActor.killDelay = 100; + kit = new TestKit(getSystem()); } @Test public void testOnReceiveCloseListenerRegistrationAfterSetRegistration() { - new TestKit(getSystem()) { - { - final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), - "testOnReceiveCloseListenerRegistrationAfterSetRegistration"); - watch(subject); + final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), + "testOnReceiveCloseListenerRegistrationAfterSetRegistration"); + kit.watch(subject); - subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, - mockOnClose), ActorRef.noSender()); - subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, + mockOnClose), ActorRef.noSender()); + subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef()); - expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class); + kit.expectMsgClass(kit.duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class); - Mockito.verify(mockListenerReg, timeout(5000)).close(); - Mockito.verify(mockOnClose, timeout(5000)).run(); + verify(mockListenerReg, timeout(5000)).close(); + verify(mockOnClose, timeout(5000)).run(); - expectTerminated(duration("5 second"), subject); - } - }; + kit.expectTerminated(kit.duration("5 second"), subject); } @Test public void testOnReceiveCloseListenerRegistrationBeforeSetRegistration() { - new TestKit(getSystem()) { - { - final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), - "testOnReceiveSetRegistrationAfterPriorClose"); - watch(subject); + final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), + "testOnReceiveSetRegistrationAfterPriorClose"); + kit.watch(subject); - subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef()); - expectMsgClass(duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class); + subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), kit.getRef()); + kit.expectMsgClass(kit.duration("5 second"), CloseDataTreeNotificationListenerRegistrationReply.class); - subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, - mockOnClose), ActorRef.noSender()); + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, + mockOnClose), ActorRef.noSender()); - Mockito.verify(mockListenerReg, timeout(5000)).close(); - Mockito.verify(mockOnClose, timeout(5000)).run(); + verify(mockListenerReg, timeout(5000)).close(); + verify(mockOnClose, timeout(5000)).run(); - expectTerminated(duration("5 second"), subject); - } - }; + kit.expectTerminated(kit.duration("5 second"), subject); } @Test public void testOnReceiveSetRegistrationAfterPriorClose() { - new TestKit(getSystem()) { - { - DataTreeNotificationListenerRegistrationActor.killDelay = 1000; - final ListenerRegistration mockListenerReg2 = Mockito.mock(ListenerRegistration.class); - final Runnable mockOnClose2 = Mockito.mock(Runnable.class); - - final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), - "testOnReceiveSetRegistrationAfterPriorClose"); - watch(subject); - - subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, - mockOnClose), ActorRef.noSender()); - subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); - subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg2, - mockOnClose2), ActorRef.noSender()); - - Mockito.verify(mockListenerReg, timeout(5000)).close(); - Mockito.verify(mockOnClose, timeout(5000)).run(); - Mockito.verify(mockListenerReg2, timeout(5000)).close(); - Mockito.verify(mockOnClose2, timeout(5000)).run(); - - expectTerminated(duration("5 second"), subject); - } - }; + DataTreeNotificationListenerRegistrationActor.killDelay = 1000; + final ListenerRegistration mockListenerReg2 = mock(ListenerRegistration.class); + final Runnable mockOnClose2 = mock(Runnable.class); + + final ActorRef subject = getSystem().actorOf(DataTreeNotificationListenerRegistrationActor.props(), + "testOnReceiveSetRegistrationAfterPriorClose"); + kit.watch(subject); + + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg, + mockOnClose), ActorRef.noSender()); + subject.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); + subject.tell(new DataTreeNotificationListenerRegistrationActor.SetRegistration(mockListenerReg2, + mockOnClose2), ActorRef.noSender()); + + verify(mockListenerReg, timeout(5000)).close(); + verify(mockOnClose, timeout(5000)).run(); + verify(mockListenerReg2, timeout(5000)).close(); + verify(mockOnClose2, timeout(5000)).run(); + + kit.expectTerminated(kit.duration("5 second"), subject); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java index e77ab0357b..30b8bab1b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java @@ -31,37 +31,34 @@ public class ShardSnapshotActorTest extends AbstractActorTest { private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot, final boolean withInstallSnapshot) throws Exception { - new TestKit(getSystem()) { - { - final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName); - watch(snapshotActor); + final TestKit kit = new TestKit(getSystem()); + final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName); + kit.watch(snapshotActor); - final NormalizedNode expectedRoot = snapshot.getRootNode().get(); + final NormalizedNode expectedRoot = snapshot.getRootNode().get(); - ByteArrayOutputStream installSnapshotStream = withInstallSnapshot ? new ByteArrayOutputStream() : null; - ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, - Optional.ofNullable(installSnapshotStream), getRef()); + ByteArrayOutputStream installSnapshotStream = withInstallSnapshot ? new ByteArrayOutputStream() : null; + ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, + Optional.ofNullable(installSnapshotStream), kit.getRef()); - final CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class); - assertNotNull("getSnapshotState is null", reply.getSnapshotState()); - assertEquals("SnapshotState type", ShardSnapshotState.class, reply.getSnapshotState().getClass()); - assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot()); + final CaptureSnapshotReply reply = kit.expectMsgClass(kit.duration("3 seconds"), CaptureSnapshotReply.class); + assertNotNull("getSnapshotState is null", reply.getSnapshotState()); + assertEquals("SnapshotState type", ShardSnapshotState.class, reply.getSnapshotState().getClass()); + assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot()); - if (installSnapshotStream != null) { - final ShardDataTreeSnapshot deserialized; - try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream( - installSnapshotStream.toByteArray()))) { - deserialized = ShardDataTreeSnapshot.deserialize(in); - } + if (installSnapshotStream != null) { + final ShardDataTreeSnapshot deserialized; + try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream( + installSnapshotStream.toByteArray()))) { + deserialized = ShardDataTreeSnapshot.deserialize(in); + } - assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass()); + assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass()); - final Optional> maybeNode = deserialized.getRootNode(); - assertEquals("isPresent", true, maybeNode.isPresent()); - assertEquals("Root node", expectedRoot, maybeNode.get()); - } - } - }; + final Optional> maybeNode = deserialized.getRootNode(); + assertEquals("isPresent", true, maybeNode.isPresent()); + assertEquals("Root node", expectedRoot, maybeNode.get()); + } } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index bfb4129790..4b2ce82cd5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore.shardmanager; import static org.junit.Assert.assertEquals; @@ -14,6 +13,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -61,7 +62,6 @@ import java.util.stream.Collectors; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; @@ -168,8 +168,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) { DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); - Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); - Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString()); + doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); + doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString()); return mockFactory; } @@ -240,11 +240,11 @@ public class ShardManagerTest extends AbstractShardManagerTest { final DatastoreContextFactory mockFactory = newDatastoreContextFactory( datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); - Mockito.doReturn( + doReturn( DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build()) .when(mockFactory).getShardDatastoreContext("default"); - Mockito.doReturn( + doReturn( DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build()) .when(mockFactory).getShardDatastoreContext("topology"); @@ -301,7 +301,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } }; - TestKit kit = new TestKit(getSystem()); + final TestKit kit = new TestKit(getSystem()); final ActorRef shardManager = actorFactory.createActor(Props.create( new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId())); @@ -316,11 +316,11 @@ public class ShardManagerTest extends AbstractShardManagerTest { DatastoreContextFactory newMockFactory = newDatastoreContextFactory( datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); - Mockito.doReturn( + doReturn( DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build()) .when(newMockFactory).getShardDatastoreContext("default"); - Mockito.doReturn( + doReturn( DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build()) .when(newMockFactory).getShardDatastoreContext("topology"); @@ -338,49 +338,43 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryForNonExistentShard() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new FindPrimary("non-existent", false), getRef()); + shardManager.tell(new FindPrimary("non-existent", false), kit.getRef()); - expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); - } - }; + kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class); } @Test public void testOnReceiveFindPrimaryForLocalLeaderShard() { LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting"); - new TestKit(getSystem()) { - { - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + final TestKit kit = new TestKit(getSystem()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, - DataStoreVersions.CURRENT_VERSION), getRef()); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, + DataStoreVersions.CURRENT_VERSION), kit.getRef()); - MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); - shardManager.tell( - new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor); + MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); + shardManager.tell( + new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), - LocalPrimaryShardFound.class); - assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), - primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); - } - }; + LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), + LocalPrimaryShardFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending"); } @@ -388,26 +382,23 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() { LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; - String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.tell( - new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), - mockShardActor); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell( + new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), + mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - } - }; + kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending"); } @@ -415,97 +406,85 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShard() { LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); - - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - - String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.tell( - new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor); - short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; - shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor); - - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - - RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), - RemotePrimaryShardFound.class); - assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), - primaryFound.getPrimaryPath().contains("member-2-shard-default")); - assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); - } - }; + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString()); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell( + new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor); + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); + + RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), + RemotePrimaryShardFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-2-shard-default")); + assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending"); } @Test public void testOnReceiveFindPrimaryForUninitializedShard() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - } - }; + kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); } @Test public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - } - }; + kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); } @Test public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() { LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.tell( - new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell( + new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); - DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, - DataStoreVersions.CURRENT_VERSION), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), - LocalPrimaryShardFound.class); - assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), - primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); - } - }; + LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), + LocalPrimaryShardFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting"); } @@ -514,44 +493,41 @@ public class ShardManagerTest extends AbstractShardManagerTest { public void testOnReceiveFindPrimaryWaitForShardLeader() { LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting"); datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - // We're passing waitUntilInitialized = true to FindPrimary so - // the response should be - // delayed until we send ActorInitialized and - // RoleChangeNotification. - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + // We're passing waitUntilInitialized = true to FindPrimary so + // the response should be + // delayed until we send ActorInitialized and + // RoleChangeNotification. + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(), mockShardActor); - expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.tell( - new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell( + new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor); - expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, - DataStoreVersions.CURRENT_VERSION), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); - LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), - LocalPrimaryShardFound.class); - assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), - primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); + LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), + LocalPrimaryShardFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); - expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); - } - }; + kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending"); } @@ -559,21 +535,18 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() { LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - expectMsgClass(duration("2 seconds"), NotInitializedException.class); + kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(), mockShardActor); - expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); - } - }; + kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending"); } @@ -581,20 +554,17 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() { LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, - RaftState.Candidate.name()), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, + RaftState.Candidate.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); - } - }; + kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending"); } @@ -602,20 +572,17 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() { LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, - RaftState.IsolatedLeader.name()), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, + RaftState.IsolatedLeader.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef()); - expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); - } - }; + kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending"); } @@ -623,18 +590,15 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() { LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); - } - }; + kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending"); } @@ -671,33 +635,30 @@ public class ShardManagerTest extends AbstractShardManagerTest { new ClusterWrapperImpl(system2)).props().withDispatcher( Dispatchers.DefaultDispatcherId()), shardManagerID); - new TestKit(system1) { - { - shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + final TestKit kit = new TestKit(system1); + shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + shardManager2.tell(new ActorInitialized(), mockShardActor2); - String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; - short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; - shardManager2.tell( - new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), - mockShardActor2); - shardManager2.tell( - new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor2); + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor2); - shardManager1.underlyingActor().waitForMemberUp(); - shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + shardManager1.underlyingActor().waitForMemberUp(); + shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef()); - RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); - String path = found.getPrimaryPath(); - assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); - assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion()); + RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion()); - shardManager2.underlyingActor().verifyFindPrimary(); + shardManager2.underlyingActor().verifyFindPrimary(); - // This part times out quite a bit on jenkins for some reason + // This part times out quite a bit on jenkins for some reason // Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); // @@ -706,8 +667,6 @@ public class ShardManagerTest extends AbstractShardManagerTest { // shardManager1.tell(new FindPrimary("astronauts", false), getRef()); // // expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); - } - }; LOG.info("testOnReceiveFindPrimaryForRemoteShard ending"); } @@ -745,91 +704,84 @@ public class ShardManagerTest extends AbstractShardManagerTest { new ClusterWrapperImpl(system2)).props().withDispatcher( Dispatchers.DefaultDispatcherId()), shardManagerID); - new TestKit(system1) { - { - shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager1.tell(new ActorInitialized(), mockShardActor1); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + final TestKit kit = new TestKit(system1); + shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; - String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor1); - shardManager1.tell( - new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor1); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor2); - shardManager2.tell( - new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor2); - shardManager1.underlyingActor().waitForMemberUp(); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell( + new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); + shardManager2.tell( + new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); - shardManager1.tell(new FindPrimary("default", true), getRef()); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); - String path = found.getPrimaryPath(); - assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); - shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka://cluster-test@127.0.0.1:2558"), getRef()); + shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), + kit.getRef()); - shardManager1.underlyingActor().waitForUnreachableMember(); + shardManager1.underlyingActor().waitForUnreachableMember(); - PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); - assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName()); - MessageCollectorActor.clearMessages(mockShardActor1); + PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); - shardManager1.tell( - MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"), - getRef()); + shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"), + kit.getRef()); - MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); - shardManager1.tell(new FindPrimary("default", true), getRef()); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); - shardManager1.tell( - MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), - getRef()); + shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), + kit.getRef()); - shardManager1.underlyingActor().waitForReachableMember(); + shardManager1.underlyingActor().waitForReachableMember(); - PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); - assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName()); - MessageCollectorActor.clearMessages(mockShardActor1); + PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); - shardManager1.tell(new FindPrimary("default", true), getRef()); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); - String path1 = found1.getPrimaryPath(); - assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); + RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); - shardManager1.tell( - MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"), - getRef()); + shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"), + kit.getRef()); - MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); - // Test FindPrimary wait succeeds after reachable member event. + // Test FindPrimary wait succeeds after reachable member event. - shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka://cluster-test@127.0.0.1:2558"), getRef()); - shardManager1.underlyingActor().waitForUnreachableMember(); + shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", + "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); + shardManager1.underlyingActor().waitForUnreachableMember(); - shardManager1.tell(new FindPrimary("default", true), getRef()); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - shardManager1.tell( - MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), - getRef()); + shardManager1.tell( + MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); - RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); - String path2 = found2.getPrimaryPath(); - assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config")); - } - }; + RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + String path2 = found2.getPrimaryPath(); + assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config")); LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending"); } @@ -869,62 +821,58 @@ public class ShardManagerTest extends AbstractShardManagerTest { new ClusterWrapperImpl(system2)).props().withDispatcher( Dispatchers.DefaultDispatcherId()), shardManagerID); - new TestKit(system1) { - { - shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager1.tell(new ActorInitialized(), mockShardActor1); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + final TestKit kit = new TestKit(system1); + shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; - String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor1); - shardManager1.tell( - new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor1); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor2); - shardManager2.tell( - new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor2); - shardManager1.underlyingActor().waitForMemberUp(); - - shardManager1.tell(new FindPrimary("default", true), getRef()); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell( + new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); + shardManager2.tell( + new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); - RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); - String path = found.getPrimaryPath(); - assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo( - system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION)); + RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); - shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka://cluster-test@127.0.0.1:2558"), getRef()); + primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo( + system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION)); - shardManager1.underlyingActor().waitForUnreachableMember(); + shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", + "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); - shardManager1.tell(new FindPrimary("default", true), getRef()); + shardManager1.underlyingActor().waitForUnreachableMember(); - expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - assertNull("Expected primaryShardInfoCache entry removed", - primaryShardInfoCache.getIfPresent("default")); + kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); - shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor1); - shardManager1.tell( - new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()), - mockShardActor1); + assertNull("Expected primaryShardInfoCache entry removed", + primaryShardInfoCache.getIfPresent("default")); - shardManager1.tell(new FindPrimary("default", true), getRef()); + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell( + new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()), + mockShardActor1); - LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); - String path1 = found1.getPrimaryPath(); - assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); + shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - } - }; + LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending"); } @@ -969,137 +917,123 @@ public class ShardManagerTest extends AbstractShardManagerTest { new ClusterWrapperImpl(system2)).props().withDispatcher( Dispatchers.DefaultDispatcherId()), shardManagerID); - new TestKit(system256) { - { - shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager256.tell(new ActorInitialized(), mockShardActor256); - shardManager2.tell(new ActorInitialized(), mockShardActor2); - - String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix; - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; - shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor256); - shardManager256.tell( - new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor256); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), mockShardActor2); - shardManager2.tell( - new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor2); - shardManager256.underlyingActor().waitForMemberUp(); - - shardManager256.tell(new FindPrimary("default", true), getRef()); - - LocalPrimaryShardFound found = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); - String path = found.getPrimaryPath(); - assertTrue("Unexpected primary path " + path + " which must on member-256", - path.contains("member-256-shard-default-config")); - - PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo( - system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION); - primaryShardInfoCache.putSuccessful("default", primaryShardInfo); - - // Simulate member-2 become unreachable. - shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka://cluster-test@127.0.0.1:2558"), getRef()); - shardManager256.underlyingActor().waitForUnreachableMember(); - - // Make sure leader shard on member-256 is still leader and still in the cache. - shardManager256.tell(new FindPrimary("default", true), getRef()); - found = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); - path = found.getPrimaryPath(); - assertTrue("Unexpected primary path " + path + " which must still not on member-256", - path.contains("member-256-shard-default-config")); - Future futurePrimaryShard = primaryShardInfoCache.getIfPresent("default"); - futurePrimaryShard.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) { - if (failure != null) { - assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false); - } else { - assertEquals("Expected primaryShardInfoCache entry", - primaryShardInfo, futurePrimaryShardInfo); - } - } - }, system256.dispatchers().defaultGlobalDispatcher()); + final TestKit kit256 = new TestKit(system256); + shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef()); + shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef()); + shardManager256.tell(new ActorInitialized(), mockShardActor256); + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix; + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor256); + shardManager256.tell( + new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor256); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); + shardManager2.tell( + new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor2); + shardManager256.underlyingActor().waitForMemberUp(); + + shardManager256.tell(new FindPrimary("default", true), kit256.getRef()); + + LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"), + LocalPrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path + " which must on member-256", + path.contains("member-256-shard-default-config")); + + PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo( + system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION); + primaryShardInfoCache.putSuccessful("default", primaryShardInfo); + + // Simulate member-2 become unreachable. + shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2", + "akka://cluster-test@127.0.0.1:2558"), kit256.getRef()); + shardManager256.underlyingActor().waitForUnreachableMember(); + + // Make sure leader shard on member-256 is still leader and still in the cache. + shardManager256.tell(new FindPrimary("default", true), kit256.getRef()); + found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class); + path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path + " which must still not on member-256", + path.contains("member-256-shard-default-config")); + Future futurePrimaryShard = primaryShardInfoCache.getIfPresent("default"); + futurePrimaryShard.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) { + if (failure != null) { + assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false); + } else { + assertEquals("Expected primaryShardInfoCache entry", + primaryShardInfo, futurePrimaryShardInfo); + } } - }; + }, system256.dispatchers().defaultGlobalDispatcher()); LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending"); } @Test public void testOnReceiveFindLocalShardForNonExistentShard() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new FindLocalShard("non-existent", false), getRef()); + shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef()); - LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class); - assertEquals("getShardName", "non-existent", notFound.getShardName()); - } - }; + assertEquals("getShardName", "non-existent", notFound.getShardName()); } @Test public void testOnReceiveFindLocalShardForExistentShard() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); - assertTrue("Found path contains " + found.getPath().path().toString(), - found.getPath().path().toString().contains("member-1-shard-default-config")); - } - }; + assertTrue("Found path contains " + found.getPath().path().toString(), + found.getPath().path().toString().contains("member-1-shard-default-config")); } @Test public void testOnReceiveFindLocalShardForNotInitializedShard() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - } - }; + kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); } @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - // We're passing waitUntilInitialized = true to FindLocalShard - // so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + // We're passing waitUntilInitialized = true to FindLocalShard + // so the response should be + // delayed until we send ActorInitialized. + Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), + new Timeout(5, TimeUnit.SECONDS)); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(), mockShardActor); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); - } - }; + Object resp = Await.result(future, kit.duration("5 seconds")); + assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); } @@ -1122,56 +1056,48 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { - new TestKit(getSystem()) { - { - TestShardManager shardManager = newTestShardManager(); + final TestKit kit = new TestKit(getSystem()); + TestShardManager shardManager = newTestShardManager(); - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - verify(ready, never()).countDown(); + verify(ready, never()).countDown(); - shardManager - .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString())); - shardManager.onReceiveCommand( - new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix, - mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand( + new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix, + mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); - verify(ready, times(1)).countDown(); - } - }; + verify(ready, times(1)).countDown(); } @Test public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { - new TestKit(getSystem()) { - { - TestShardManager shardManager = newTestShardManager(); + final TestKit kit = new TestKit(getSystem()); + TestShardManager shardManager = newTestShardManager(); - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - verify(ready, never()).countDown(); + verify(ready, never()).countDown(); - shardManager.onReceiveCommand( - new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix, - mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); + shardManager.onReceiveCommand( + new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix, + mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); - shardManager - .onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString())); - verify(ready, times(1)).countDown(); - } - }; + verify(ready, times(1)).countDown(); } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { TestShardManager shardManager = newTestShardManager(); - shardManager.onReceiveCommand(new RoleChangeNotification( - "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), + RaftState.Leader.name())); verify(ready, never()).countDown(); } @@ -1230,7 +1156,6 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); assertEquals(false, shardManager.getMBean().getSyncStatus()); - } @Test @@ -1280,22 +1205,19 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnReceiveSwitchShardBehavior() { - new TestKit(getSystem()) { - { - final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef()); + shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef()); - SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, - SwitchBehavior.class); + SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, + SwitchBehavior.class); - assertEquals(RaftState.Leader, switchBehavior.getNewState()); - assertEquals(1000, switchBehavior.getNewTerm()); - } - }; + assertEquals(RaftState.Leader, switchBehavior.getNewState()); + assertEquals(1000, switchBehavior.getNewTerm()); } private static List members(final String... names) { @@ -1305,51 +1227,48 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnCreateShard() { LOG.info("testOnCreateShard starting"); - new TestKit(getSystem()) { - { - datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + final TestKit kit = new TestKit(getSystem()); + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); - ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .withDispatcher(Dispatchers.DefaultDispatcherId())); + ActorRef shardManager = actorFactory + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); - SchemaContext schemaContext = TEST_SCHEMA_CONTEXT; - shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + SchemaContext schemaContext = TEST_SCHEMA_CONTEXT; + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); - DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100) - .persistent(false).build(); - Shard.Builder shardBuilder = Shard.builder(); + DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100) + .persistent(false).build(); + Shard.Builder shardBuilder = Shard.builder(); - ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", - "foo", null, members("member-1", "member-5", "member-6")); - shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef()); + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, members("member-1", "member-5", "member-6")); + shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef()); - expectMsgClass(duration("5 seconds"), Success.class); + kit.expectMsgClass(kit.duration("5 seconds"), Success.class); - shardManager.tell(new FindLocalShard("foo", true), getRef()); + shardManager.tell(new FindLocalShard("foo", true), kit.getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); - assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent()); - assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig() - .getPeerAddressResolver() instanceof ShardPeerAddressResolver); - assertEquals("peerMembers", Sets.newHashSet( - ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(), - ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()), - shardBuilder.getPeerAddresses().keySet()); - assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix), - shardBuilder.getId()); - assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); + assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent()); + assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig() + .getPeerAddressResolver() instanceof ShardPeerAddressResolver); + assertEquals("peerMembers", Sets.newHashSet( + ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(), + ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()), + shardBuilder.getPeerAddresses().keySet()); + assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix), + shardBuilder.getId()); + assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); - // Send CreateShard with same name - should return Success with - // a message. + // Send CreateShard with same name - should return Success with + // a message. - shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef()); - Success success = expectMsgClass(duration("5 seconds"), Success.class); - assertNotNull("Success status is null", success.status()); - } - }; + Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + assertNotNull("Success status is null", success.status()); LOG.info("testOnCreateShard ending"); } @@ -1357,31 +1276,28 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnCreateShardWithLocalMemberNotInShardConfig() { LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting"); - new TestKit(getSystem()) { - { - datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + final TestKit kit = new TestKit(getSystem()); + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); - ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .withDispatcher(Dispatchers.DefaultDispatcherId())); + ActorRef shardManager = actorFactory + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender()); - Shard.Builder shardBuilder = Shard.builder(); - ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", - "foo", null, members("member-5", "member-6")); + Shard.Builder shardBuilder = Shard.builder(); + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, members("member-5", "member-6")); - shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); - expectMsgClass(duration("5 seconds"), Success.class); + shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), Success.class); - shardManager.tell(new FindLocalShard("foo", true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); + shardManager.tell(new FindLocalShard("foo", true), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); - assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size()); - assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder - .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass()); - } - }; + assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size()); + assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder + .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass()); LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending"); } @@ -1389,31 +1305,28 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testOnCreateShardWithNoInitialSchemaContext() { LOG.info("testOnCreateShardWithNoInitialSchemaContext starting"); - new TestKit(getSystem()) { - { - ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .withDispatcher(Dispatchers.DefaultDispatcherId())); + final TestKit kit = new TestKit(getSystem()); + ActorRef shardManager = actorFactory + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); - Shard.Builder shardBuilder = Shard.builder(); + Shard.Builder shardBuilder = Shard.builder(); - ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", - "foo", null, members("member-1")); - shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, members("member-1")); + shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef()); - expectMsgClass(duration("5 seconds"), Success.class); + kit.expectMsgClass(kit.duration("5 seconds"), Success.class); - SchemaContext schemaContext = TEST_SCHEMA_CONTEXT; - shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + SchemaContext schemaContext = TEST_SCHEMA_CONTEXT; + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); - shardManager.tell(new FindLocalShard("foo", true), getRef()); + shardManager.tell(new FindLocalShard("foo", true), kit.getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); - assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); - assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); - } - }; + assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); + assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); LOG.info("testOnCreateShardWithNoInitialSchemaContext ending"); } @@ -1522,18 +1435,15 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testAddShardReplicaForNonExistentShardConfig() { - new TestKit(getSystem()) { - { - ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .withDispatcher(Dispatchers.DefaultDispatcherId())); + final TestKit kit = new TestKit(getSystem()); + ActorRef shardManager = actorFactory + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); - shardManager.tell(new AddShardReplica("model-inventory"), getRef()); - Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef()); + Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException); - } - }; + assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException); } @Test @@ -1573,114 +1483,107 @@ public class ShardManagerTest extends AbstractShardManagerTest { .withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID); - new TestKit(system1) { - { - newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - - leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - - short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; - leaderShardManager.tell( - new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), - mockShardLeaderActor); - leaderShardManager.tell( - new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardLeaderActor); - - newReplicaShardManager.underlyingActor().waitForMemberUp(); - leaderShardManager.underlyingActor().waitForMemberUp(); - - // Have a dummy snapshot to be overwritten by the new data - // persisted. - String[] restoredShards = { "default", "people" }; - ShardManagerSnapshot snapshot = - new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); - InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); - - InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID); - InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID); - - // construct a mock response message - newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); - AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, - AddServer.class); - String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; - assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - expectMsgClass(duration("5 seconds"), Status.Success.class); - - InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class); - InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID); - List persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID, - ShardManagerSnapshot.class); - assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size()); - ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0); - assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"), - Sets.newHashSet(shardManagerSnapshot.getShardList())); - } - }; + final TestKit kit = new TestKit(getSystem()); + newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell( + new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), + mockShardLeaderActor); + leaderShardManager.tell( + new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardLeaderActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + // Have a dummy snapshot to be overwritten by the new data + // persisted. + String[] restoredShards = { "default", "people" }; + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); + InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); + + InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID); + InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID); + + // construct a mock response message + newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); + AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + AddServer.class); + String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; + assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); + kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class); + + InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class); + InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID); + List persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID, + ShardManagerSnapshot.class); + assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size()); + ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0); + assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"), + Sets.newHashSet(shardManagerSnapshot.getShardList())); LOG.info("testAddShardReplica ending"); } @Test public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() { LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting"); - new TestKit(getSystem()) { - { - TestActorRef shardManager = actorFactory - .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID); + final TestKit kit = new TestKit(getSystem()); + TestActorRef shardManager = actorFactory + .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; - AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); - ActorRef leaderShardActor = shardManager.underlyingActor().getContext() - .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId); + String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; + AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); + ActorRef leaderShardActor = shardManager.underlyingActor().getContext() + .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId); - MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString()); + MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString()); - String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.tell( - new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor); - shardManager.tell( - new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION), - mockShardActor); + String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell( + new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor); + shardManager.tell( + new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION), + mockShardActor); - shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); - MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class); + MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class); - Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); - assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); - // Send message again to verify previous in progress state is - // cleared + // Send message again to verify previous in progress state is + // cleared - shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); - resp = expectMsgClass(duration("5 seconds"), Failure.class); - assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); + resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); - // Send message again with an AddServer timeout to verify the - // pre-existing shard actor isn't terminated. + // Send message again with an AddServer timeout to verify the + // pre-existing shard actor isn't terminated. - shardManager.tell( - newDatastoreContextFactory( - datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), - getRef()); - leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender()); - shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); - expectMsgClass(duration("5 seconds"), Failure.class); + shardManager.tell( + newDatastoreContextFactory( + datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef()); + leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender()); + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - } - }; + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending"); } @@ -1688,27 +1591,24 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testAddShardReplicaWithPreExistingLocalReplicaLeader() { LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting"); - new TestKit(getSystem()) { - { - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), getRef()); - shardManager.tell( - new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardActor); - - shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); - Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); - assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); - - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - } - }; + final TestKit kit = new TestKit(getSystem()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), kit.getRef()); + shardManager.tell( + new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor); + + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); + Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending"); } @@ -1716,47 +1616,43 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testAddShardReplicaWithAddServerReplyFailure() { LOG.info("testAddShardReplicaWithAddServerReplyFailure starting"); - new TestKit(getSystem()) { - { - TestKit mockShardLeaderKit = new TestKit(getSystem()); + final TestKit kit = new TestKit(getSystem()); + final TestKit mockShardLeaderKit = new TestKit(getSystem()); - MockConfiguration mockConfig = new MockConfiguration( - ImmutableMap.of("astronauts", Arrays.asList("member-2"))); + MockConfiguration mockConfig = new MockConfiguration( + ImmutableMap.of("astronauts", Arrays.asList("member-2"))); - ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1"); - final TestActorRef shardManager = actorFactory.createTestActor( - newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); - shardManager.underlyingActor() - .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1"); + final TestActorRef shardManager = actorFactory.createTestActor( + newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - TestKit terminateWatcher = new TestKit(getSystem()); - terminateWatcher.watch(mockNewReplicaShardActor); + TestKit terminateWatcher = new TestKit(getSystem()); + terminateWatcher.watch(mockNewReplicaShardActor); - shardManager.tell(new AddShardReplica("astronauts"), getRef()); + shardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); - AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class); - assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix, - addServerMsg.getNewServerId()); - mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null)); + AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class); + assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix, + addServerMsg.getNewServerId()); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null)); - Failure failure = expectMsgClass(duration("5 seconds"), Failure.class); - assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass()); + Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass()); - shardManager.tell(new FindLocalShard("astronauts", false), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class); - terminateWatcher.expectTerminated(mockNewReplicaShardActor); + terminateWatcher.expectTerminated(mockNewReplicaShardActor); - shardManager.tell(new AddShardReplica("astronauts"), getRef()); - mockShardLeaderKit.expectMsgClass(AddServer.class); - mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null)); - failure = expectMsgClass(duration("5 seconds"), Failure.class); - assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); - } - }; + shardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); + mockShardLeaderKit.expectMsgClass(AddServer.class); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null)); + failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); LOG.info("testAddShardReplicaWithAddServerReplyFailure ending"); } @@ -1771,41 +1667,34 @@ public class ShardManagerTest extends AbstractShardManagerTest { public void testAddShardReplicaWithFindPrimaryTimeout() { LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting"); datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS); - new TestKit(getSystem()) { - { - MockConfiguration mockConfig = new MockConfiguration( - ImmutableMap.of("astronauts", Arrays.asList("member-2"))); - - final ActorRef newReplicaShardManager = actorFactory - .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); - - newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", - AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString()); - - newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); - Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException); - } - }; + final TestKit kit = new TestKit(getSystem()); + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2"))); + + final ActorRef newReplicaShardManager = actorFactory + .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); + + newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", + AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString()); + + newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); + Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException); LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending"); } @Test public void testRemoveShardReplicaForNonExistentShard() { - new TestKit(getSystem()) { - { - ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) - .withDispatcher(Dispatchers.DefaultDispatcherId())); - - shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef()); - Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException); - } - }; + final TestKit kit = new TestKit(getSystem()); + ActorRef shardManager = actorFactory + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef()); + Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException); } @Test @@ -1813,31 +1702,28 @@ public class ShardManagerTest extends AbstractShardManagerTest { * Primary is Local. */ public void testRemoveShardReplicaLocal() { - new TestKit(getSystem()) { - { - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - - final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class, - RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId); - - ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); - - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), respondActor); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), getRef()); - shardManager.tell( - new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), - respondActor); - - shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef()); - final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, - RemoveServer.class); - assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(), - removeServer.getServerId()); - expectMsgClass(duration("5 seconds"), Success.class); - } - }; + final TestKit kit = new TestKit(getSystem()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class, + RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), kit.getRef()); + shardManager.tell( + new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), + respondActor); + + shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef()); + final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, + RemoveServer.class); + assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(), + removeServer.getServerId()); + kit.expectMsgClass(kit.duration("5 seconds"), Success.class); } @Test @@ -1893,42 +1779,39 @@ public class ShardManagerTest extends AbstractShardManagerTest { LOG.error("Forwarding actor : {}", actorRef); - new TestKit(system1) { - { - newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - - leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - - short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; - leaderShardManager.tell( - new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), - mockShardLeaderActor); - leaderShardManager.tell( - new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), - mockShardLeaderActor); - - String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; - newReplicaShardManager.tell( - new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion), - mockShardActor); - newReplicaShardManager.tell( - new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), - mockShardActor); - - newReplicaShardManager.underlyingActor().waitForMemberUp(); - leaderShardManager.underlyingActor().waitForMemberUp(); - - // construct a mock response message - newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef()); - RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, - RemoveServer.class); - String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); - assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); - expectMsgClass(duration("5 seconds"), Status.Success.class); - } - }; + final TestKit kit = new TestKit(getSystem()); + newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell( + new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), + mockShardLeaderActor); + leaderShardManager.tell( + new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardLeaderActor); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + newReplicaShardManager.tell( + new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion), + mockShardActor); + newReplicaShardManager.tell( + new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + // construct a mock response message + newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef()); + RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + RemoveServer.class); + String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); + assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); + kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class); } @Test @@ -1947,65 +1830,58 @@ public class ShardManagerTest extends AbstractShardManagerTest { public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange, final Class firstForwardedServerChangeClass, final Object secondServerChange) { - new TestKit(getSystem()) { - { - TestKit mockShardLeaderKit = new TestKit(getSystem()); - final TestKit secondRequestKit = new TestKit(getSystem()); + final TestKit kit = new TestKit(getSystem()); + final TestKit mockShardLeaderKit = new TestKit(getSystem()); + final TestKit secondRequestKit = new TestKit(getSystem()); - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() - .put(shardName, Arrays.asList("member-2")).build()); + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() + .put(shardName, Arrays.asList("member-2")).build()); - final TestActorRef shardManager = TestActorRef.create(getSystem(), - newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor) - .cluster(new MockClusterWrapper()).props() - .withDispatcher(Dispatchers.DefaultDispatcherId()), - shardMgrID); + final TestActorRef shardManager = TestActorRef.create(getSystem(), + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor) + .cluster(new MockClusterWrapper()).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), + shardMgrID); - shardManager.underlyingActor() - .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(firstServerChange, getRef()); + shardManager.tell(firstServerChange, kit.getRef()); - mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass); + mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass); - shardManager.tell(secondServerChange, secondRequestKit.getRef()); + shardManager.tell(secondServerChange, secondRequestKit.getRef()); - secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); - } - }; + secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class); } @Test public void testServerRemovedShardActorNotRunning() { LOG.info("testServerRemovedShardActorNotRunning starting"); - new TestKit(getSystem()) { - { - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() - .put("default", Arrays.asList("member-1", "member-2")) - .put("astronauts", Arrays.asList("member-2")) - .put("people", Arrays.asList("member-1", "member-2")).build()); - - TestActorRef shardManager = actorFactory.createTestActor( - newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); - - shardManager.underlyingActor().waitForRecoveryComplete(); - shardManager.tell(new FindLocalShard("people", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - - shardManager.tell(new FindLocalShard("default", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - - // Removed the default shard replica from member-1 - ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); - ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix) - .build(); - shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); - - shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); - } - }; + final TestKit kit = new TestKit(getSystem()); + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() + .put("default", Arrays.asList("member-1", "member-2")) + .put("astronauts", Arrays.asList("member-2")) + .put("people", Arrays.asList("member-1", "member-2")).build()); + + TestActorRef shardManager = actorFactory.createTestActor( + newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.underlyingActor().waitForRecoveryComplete(); + shardManager.tell(new FindLocalShard("people", false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + + shardManager.tell(new FindLocalShard("default", false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + + // Removed the default shard replica from member-1 + ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); + ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix) + .build(); + shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef()); + + shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); LOG.info("testServerRemovedShardActorNotRunning ending"); } @@ -2013,36 +1889,33 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testServerRemovedShardActorRunning() { LOG.info("testServerRemovedShardActorRunning starting"); - new TestKit(getSystem()) { - { - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() - .put("default", Arrays.asList("member-1", "member-2")) - .put("astronauts", Arrays.asList("member-2")) - .put("people", Arrays.asList("member-1", "member-2")).build()); + final TestKit kit = new TestKit(getSystem()); + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() + .put("default", Arrays.asList("member-1", "member-2")) + .put("astronauts", Arrays.asList("member-2")) + .put("people", Arrays.asList("member-1", "member-2")).build()); - String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); - ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId); + String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); + ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId); - TestActorRef shardManager = actorFactory - .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props() - .withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef shardManager = actorFactory + .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props() + .withDispatcher(Dispatchers.DefaultDispatcherId())); - shardManager.underlyingActor().waitForRecoveryComplete(); + shardManager.underlyingActor().waitForRecoveryComplete(); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), shard); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), shard); - waitForShardInitialized(shardManager, "people", this); - waitForShardInitialized(shardManager, "default", this); + waitForShardInitialized(shardManager, "people", kit); + waitForShardInitialized(shardManager, "default", kit); - // Removed the default shard replica from member-1 - shardManager.tell(new ServerRemoved(shardId), getRef()); + // Removed the default shard replica from member-1 + shardManager.tell(new ServerRemoved(shardId), kit.getRef()); - shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); + shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); - MessageCollectorActor.expectFirstMatching(shard, Shutdown.class); - } - }; + MessageCollectorActor.expectFirstMatching(shard, Shutdown.class); LOG.info("testServerRemovedShardActorRunning ending"); } @@ -2050,39 +1923,36 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testShardPersistenceWithRestoredData() { LOG.info("testShardPersistenceWithRestoredData starting"); - new TestKit(getSystem()) { - { - MockConfiguration mockConfig = - new MockConfiguration(ImmutableMap.>builder() - .put("default", Arrays.asList("member-1", "member-2")) - .put("astronauts", Arrays.asList("member-2")) - .put("people", Arrays.asList("member-1", "member-2")).build()); - String[] restoredShards = {"default", "astronauts"}; - ShardManagerSnapshot snapshot = - new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); - InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); - - // create shardManager to come up with restored data - TestActorRef newRestoredShardManager = actorFactory.createTestActor( - newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); - - newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); - - newRestoredShardManager.tell(new FindLocalShard("people", false), getRef()); - LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - assertEquals("for uninitialized shard", "people", notFound.getShardName()); - - // Verify a local shard is created for the restored shards, - // although we expect a NotInitializedException for the shards - // as the actor initialization - // message is not sent for them - newRestoredShardManager.tell(new FindLocalShard("default", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - - newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - } - }; + final TestKit kit = new TestKit(getSystem()); + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder() + .put("default", Arrays.asList("member-1", "member-2")) + .put("astronauts", Arrays.asList("member-2")) + .put("people", Arrays.asList("member-1", "member-2")).build()); + String[] restoredShards = {"default", "astronauts"}; + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); + InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); + + // create shardManager to come up with restored data + TestActorRef newRestoredShardManager = actorFactory.createTestActor( + newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); + + newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); + + newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef()); + LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class); + assertEquals("for uninitialized shard", "people", notFound.getShardName()); + + // Verify a local shard is created for the restored shards, + // although we expect a NotInitializedException for the shards + // as the actor initialization + // message is not sent for them + newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + + newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef()); + kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); LOG.info("testShardPersistenceWithRestoredData ending"); } @@ -2090,110 +1960,99 @@ public class ShardManagerTest extends AbstractShardManagerTest { @Test public void testShutDown() throws Exception { LOG.info("testShutDown starting"); - new TestKit(getSystem()) { - { - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() - .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build()); + final TestKit kit = new TestKit(getSystem()); + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() + .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build()); - String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString(); - ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1); + String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString(); + ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1); - String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString(); - ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2); + String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString(); + ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2); - ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig) - .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props()); + ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig) + .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props()); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), shard1); - shardManager.tell(new ActorInitialized(), shard2); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), shard1); + shardManager.tell(new ActorInitialized(), shard2); - FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - Future stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE); + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE); - MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class); - MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class); + MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class); + MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class); - try { - Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS)); - fail("ShardManager actor stopped without waiting for the Shards to be stopped"); - } catch (TimeoutException e) { - // expected - } + try { + Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + fail("ShardManager actor stopped without waiting for the Shards to be stopped"); + } catch (TimeoutException e) { + // expected + } - actorFactory.killActor(shard1, this); - actorFactory.killActor(shard2, this); + actorFactory.killActor(shard1, kit); + actorFactory.killActor(shard2, kit); - Boolean stopped = Await.result(stopFuture, duration); - assertEquals("Stopped", Boolean.TRUE, stopped); - } - }; + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); LOG.info("testShutDown ending"); } @Test public void testChangeServersVotingStatus() { - new TestKit(getSystem()) { - { - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - - ActorRef respondActor = actorFactory - .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, - new ServerChangeReply(ServerChangeStatus.OK, null)), memberId); - - ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); - - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), respondActor); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), - DataStoreVersions.CURRENT_VERSION), getRef()); - shardManager.tell( - new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), - respondActor); - - shardManager.tell( - new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), - getRef()); - - ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor - .expectFirstMatching(respondActor, ChangeServersVotingStatus.class); - assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(), - ImmutableMap.of(ShardIdentifier - .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(), - Boolean.TRUE)); - - expectMsgClass(duration("5 seconds"), Success.class); - } - }; + final TestKit kit = new TestKit(getSystem()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + ActorRef respondActor = actorFactory + .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + new ServerChangeReply(ServerChangeStatus.OK, null)), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), kit.getRef()); + shardManager.tell( + new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), + respondActor); + + shardManager.tell( + new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef()); + + ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor + .expectFirstMatching(respondActor, ChangeServersVotingStatus.class); + assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(), + ImmutableMap.of(ShardIdentifier + .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(), + Boolean.TRUE)); + + kit.expectMsgClass(kit.duration("5 seconds"), Success.class); } @Test public void testChangeServersVotingStatusWithNoLeader() { - new TestKit(getSystem()) { - { - String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + final TestKit kit = new TestKit(getSystem()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - ActorRef respondActor = actorFactory - .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, - new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId); + ActorRef respondActor = actorFactory + .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId); - ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); - shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), getRef()); - shardManager.tell(new ActorInitialized(), respondActor); - shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor); + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor); - shardManager.tell( - new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), - getRef()); + shardManager.tell( + new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef()); - MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class); + MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class); - Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); - assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException); - } - }; + Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class); + assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException); } public static class TestShardManager extends ShardManager { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index a1ff319c36..2b8b2bb528 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -141,89 +142,66 @@ public class ActorContextTest extends AbstractActorTest { @Test public void testFindLocalShardWithShardFound() { - new TestKit(getSystem()) { - { - within(duration("1 seconds"), () -> { - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + final TestKit testKit = new TestKit(getSystem()); + testKit.within(testKit.duration("1 seconds"), () -> { + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); - ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, - mock(ClusterWrapper.class), mock(Configuration.class)); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - Optional out = actorContext.findLocalShard("default"); + Optional out = actorContext.findLocalShard("default"); - assertEquals(shardActorRef, out.get()); - - expectNoMessage(); - return null; - }); - } - }; + assertEquals(shardActorRef, out.get()); + testKit.expectNoMessage(); + return null; + }); } @Test public void testFindLocalShardWithShardNotFound() { - new TestKit(getSystem()) { - { - ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null)); - - ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, - mock(ClusterWrapper.class), mock(Configuration.class)); + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null)); - Optional out = actorContext.findLocalShard("default"); - assertTrue(!out.isPresent()); - } - }; + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class), + mock(Configuration.class)); + Optional out = actorContext.findLocalShard("default"); + assertFalse(out.isPresent()); } @Test public void testExecuteRemoteOperation() { - new TestKit(getSystem()) { - { - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); - ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, - mock(ClusterWrapper.class), mock(Configuration.class)); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Object out = actorContext.executeOperation(actor, "hello"); + Object out = actorContext.executeOperation(actor, "hello"); - assertEquals("hello", out); - } - }; + assertEquals("hello", out); } @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testExecuteRemoteOperationAsync() { - new TestKit(getSystem()) { - { - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + public void testExecuteRemoteOperationAsync() throws Exception { + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); - ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, - mock(ClusterWrapper.class), mock(Configuration.class)); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mock(Configuration.class)); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Future future = actorContext.executeOperationAsync(actor, "hello"); + Future future = actorContext.executeOperationAsync(actor, "hello"); - try { - Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - assertEquals("Result", "hello", result); - } catch (Exception e) { - throw new AssertionError(e); - } - } - }; + Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); + assertEquals("Result", "hello", result); } @Test @@ -313,34 +291,31 @@ public class ActorContextTest extends AbstractActorTest { @Test public void testSetDatastoreContext() { - new TestKit(getSystem()) { - { - ActorContext actorContext = new ActorContext(getSystem(), getRef(), - mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder() - .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), - new PrimaryShardInfoFutureCache()); + final TestKit testKit = new TestKit(getSystem()); + ActorContext actorContext = new ActorContext(getSystem(), testKit.getRef(), + mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder() + .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), + new PrimaryShardInfoFutureCache()); - assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); - assertEquals("getTransactionCommitOperationTimeout", 7, - actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); - DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6) - .shardTransactionCommitTimeoutInSeconds(8).build(); + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6) + .shardTransactionCommitTimeoutInSeconds(8).build(); - DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); - Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext(); + DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext(); - actorContext.setDatastoreContext(mockContextFactory); + actorContext.setDatastoreContext(mockContextFactory); - expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class); + testKit.expectMsgClass(testKit.duration("5 seconds"), DatastoreContextFactory.class); - Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); - assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); - assertEquals("getTransactionCommitOperationTimeout", 8, - actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); - } - }; + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); } @Test @@ -471,34 +446,30 @@ public class ActorContextTest extends AbstractActorTest { @Test public void testBroadcast() { - new TestKit(getSystem()) { - { - ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props()); - ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props()); - - TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), - MockShardManager.props()); - MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound( - shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION)); - shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound( - shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION)); - shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); - - Configuration mockConfig = mock(Configuration.class); - doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig) - .getAllShardNames(); - - ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, - mock(ClusterWrapper.class), mockConfig, - DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), - new PrimaryShardInfoFutureCache()); - - actorContext.broadcast(v -> new TestMessage(), TestMessage.class); - - MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class); - MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class); - } - }; + ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props()); + ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props()); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), + MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound( + shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound( + shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig) + .getAllShardNames(); + + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), + new PrimaryShardInfoFutureCache()); + + actorContext.broadcast(v -> new TestMessage(), TestMessage.class); + + MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class); + MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java index ad8a69f79e..154a7d4d42 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.sharding; import static akka.actor.ActorRef.noSender; @@ -30,15 +29,12 @@ public class RoleChangeListenerActorTest extends AbstractActorTest { @Test public void testRegisterRoleChangeListenerOnStart() { - new TestKit(getSystem()) { - { - final LeaderLocationListener listener = mock(LeaderLocationListener.class); - final Props props = RoleChangeListenerActor.props(getRef(), listener); - - getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart"); - expectMsgClass(RegisterRoleChangeListener.class); - } - }; + final TestKit testKit = new TestKit(getSystem()); + final LeaderLocationListener listener = mock(LeaderLocationListener.class); + final Props props = RoleChangeListenerActor.props(testKit.getRef(), listener); + + getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart"); + testKit.expectMsgClass(RegisterRoleChangeListener.class); } @Test @@ -57,6 +53,5 @@ public class RoleChangeListenerActorTest extends AbstractActorTest { subject.tell(new LeaderStateChanged("member-1", "member-2", (short) 0), noSender()); verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.REMOTE)); - } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index 18e4348625..ddf0ba3dba 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -5,18 +5,16 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.when; import akka.actor.Status.Failure; -import akka.testkit.javadsl.TestKit; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.mdsal.dom.api.DOMRpcException; @@ -25,47 +23,38 @@ import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class RpcBrokerTest extends AbstractRpcTest { @Test public void testExecuteRpc() { - new TestKit(node1) { - { - - final ContainerNode invokeRpcResult = makeRPCOutput("bar"); - final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult); - when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.>any())).thenReturn( - FluentFutures.immediateFluentFuture(rpcResult)); + final ContainerNode invokeRpcResult = makeRPCOutput("bar"); + final DOMRpcResult rpcResult = new DefaultDOMRpcResult(invokeRpcResult); + when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), any())).thenReturn( + FluentFutures.immediateFluentFuture(rpcResult)); - final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); + final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); - rpcInvoker1.tell(executeMsg, getRef()); + rpcInvoker1.tell(executeMsg, rpcRegistry1Probe.getRef()); - final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); + final RpcResponse rpcResponse = rpcRegistry1Probe.expectMsgClass(rpcRegistry1Probe.duration("5 seconds"), + RpcResponse.class); - assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode()); - } - }; + assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode()); } @Test public void testExecuteRpcFailureWithException() { - new TestKit(node1) { - { - when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.>any())) - .thenReturn(FluentFutures.immediateFailedFluentFuture( - new DOMRpcImplementationNotAvailableException("NOT FOUND"))); + when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), any())).thenReturn(FluentFutures.immediateFailedFluentFuture( + new DOMRpcImplementationNotAvailableException("NOT FOUND"))); - final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); + final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); - rpcInvoker1.tell(executeMsg, getRef()); + rpcInvoker1.tell(executeMsg, rpcRegistry1Probe.getRef()); - final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + final Failure rpcResponse = rpcRegistry1Probe.expectMsgClass(rpcRegistry1Probe.duration("5 seconds"), + Failure.class); - Assert.assertTrue(rpcResponse.cause() instanceof DOMRpcException); - } - }; + assertTrue(rpcResponse.cause() instanceof DOMRpcException); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java index 8b1892eb2b..7ae8a0a57f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java @@ -29,47 +29,39 @@ public class RpcListenerTest { private static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier .create(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)); private static final DOMRpcIdentifier RPC_ID = DOMRpcIdentifier.create(RPC_TYPE, TEST_PATH); - static ActorSystem system; + private static ActorSystem SYSTEM; @BeforeClass public static void setup() { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); + SYSTEM = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); } @AfterClass public static void teardown() { - TestKit.shutdownActorSystem(system); - system = null; + TestKit.shutdownActorSystem(SYSTEM); + SYSTEM = null; } @Test public void testRouteAdd() { - new TestKit(system) { - { - // Test announcements - final TestKit probeReg = new TestKit(system); - final ActorRef rpcRegistry = probeReg.getRef(); + // Test announcements + final TestKit probeReg = new TestKit(SYSTEM); + final ActorRef rpcRegistry = probeReg.getRef(); - final RpcListener rpcListener = new RpcListener(rpcRegistry); - rpcListener.onRpcAvailable(Collections.singleton(RPC_ID)); - probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); - } - }; + final RpcListener rpcListener = new RpcListener(rpcRegistry); + rpcListener.onRpcAvailable(Collections.singleton(RPC_ID)); + probeReg.expectMsgClass(RpcRegistry.Messages.AddOrUpdateRoutes.class); } @Test public void testRouteRemove() { - new TestKit(system) { - { - // Test announcements - final TestKit probeReg = new TestKit(system); - final ActorRef rpcRegistry = probeReg.getRef(); + // Test announcements + final TestKit probeReg = new TestKit(SYSTEM); + final ActorRef rpcRegistry = probeReg.getRef(); - final RpcListener rpcListener = new RpcListener(rpcRegistry); - rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID)); - probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); - } - }; + final RpcListener rpcListener = new RpcListener(rpcRegistry); + rpcListener.onRpcUnavailable(Collections.singleton(RPC_ID)); + probeReg.expectMsgClass(RpcRegistry.Messages.RemoveRoutes.class); } } -- 2.36.6