Fix intermittent PreLeaderScenarioTest failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
index 7e484ebde3d79bcadbb182162c55c23c56675d78..960e0328d6cab5142d59540df7cbcdafffff49eb 100644 (file)
@@ -17,6 +17,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -31,7 +32,6 @@ import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.util.Arrays;
 import java.util.Map;
@@ -55,6 +55,9 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.raft.utils.EchoActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,9 +66,9 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-public class ActorContextTest extends AbstractActorTest{
+public class ActorContextTest extends AbstractActorTest {
 
-    static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
+    static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
 
     private static class TestMessage {
     }
@@ -76,18 +79,18 @@ public class ActorContextTest extends AbstractActorTest{
         private final ActorRef actorRef;
         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
 
-        private MockShardManager(boolean found, ActorRef actorRef){
+        private MockShardManager(final boolean found, final ActorRef actorRef) {
 
             this.found = found;
             this.actorRef = actorRef;
         }
 
-        @Override public void onReceive(Object message) throws Exception {
-            if(message instanceof FindPrimary) {
+        @Override public void onReceive(final Object message) throws Exception {
+            if (message instanceof FindPrimary) {
                 FindPrimary fp = (FindPrimary)message;
                 Object resp = findPrimaryResponses.get(fp.getShardName());
-                if(resp == null) {
-                    log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+                if (resp == null) {
+                    LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
                 } else {
                     getSender().tell(resp, getSelf());
                 }
@@ -95,23 +98,23 @@ public class ActorContextTest extends AbstractActorTest{
                 return;
             }
 
-            if(found){
+            if (found) {
                 getSender().tell(new LocalShardFound(actorRef), getSelf());
             } else {
                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
             }
         }
 
-        void addFindPrimaryResp(String shardName, Object resp) {
+        void addFindPrimaryResp(final String shardName, final Object resp) {
             findPrimaryResponses.put(shardName, resp);
         }
 
-        private static Props props(final boolean found, final ActorRef actorRef){
-            return Props.create(new MockShardManagerCreator(found, actorRef) );
+        private static Props props(final boolean found, final ActorRef actorRef) {
+            return Props.create(new MockShardManagerCreator(found, actorRef));
         }
 
-        private static Props props(){
-            return Props.create(new MockShardManagerCreator() );
+        private static Props props() {
+            return Props.create(new MockShardManagerCreator());
         }
 
         @SuppressWarnings("serial")
@@ -124,7 +127,7 @@ public class ActorContextTest extends AbstractActorTest{
                 this.actorRef = null;
             }
 
-            MockShardManagerCreator(boolean found, ActorRef actorRef) {
+            MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
                 this.found = found;
                 this.actorRef = actorRef;
             }
@@ -137,93 +140,93 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testFindLocalShardWithShardFound(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+    public void testFindLocalShardWithShardFound() {
+        new JavaTestKit(getSystem()) {
+            {
+                new Within(duration("1 seconds")) {
+                    @Override
+                    protected void run() {
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
+                        ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+                        ActorRef shardManagerActorRef = getSystem()
+                                .actorOf(MockShardManager.props(true, shardActorRef));
 
-                    Optional<ActorRef> out = actorContext.findLocalShard("default");
+                        ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                                mock(ClusterWrapper.class), mock(Configuration.class));
 
-                    assertEquals(shardActorRef, out.get());
+                        Optional<ActorRef> out = actorContext.findLocalShard("default");
 
+                        assertEquals(shardActorRef, out.get());
 
-                    expectNoMsg();
-                }
-            };
-        }};
+                        expectNoMsg();
+                    }
+                };
+            }
+        };
 
     }
 
     @Test
-    public void testFindLocalShardWithShardNotFound(){
-        new JavaTestKit(getSystem()) {{
-            ActorRef shardManagerActorRef = getSystem()
-                    .actorOf(MockShardManager.props(false, null));
+    public void testFindLocalShardWithShardNotFound() {
+        new JavaTestKit(getSystem()) {
+            {
+                ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
 
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                        mock(ClusterWrapper.class), mock(Configuration.class));
 
-            Optional<ActorRef> out = actorContext.findLocalShard("default");
-            assertTrue(!out.isPresent());
-        }};
+                Optional<ActorRef> out = actorContext.findLocalShard("default");
+                assertTrue(!out.isPresent());
+            }
+        };
 
     }
 
     @Test
     public void testExecuteRemoteOperation() {
-        new JavaTestKit(getSystem()) {{
-            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+        new JavaTestKit(getSystem()) {
+            {
+                ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-            ActorRef shardManagerActorRef = getSystem()
-                    .actorOf(MockShardManager.props(true, shardActorRef));
+                ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
 
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                        mock(ClusterWrapper.class), mock(Configuration.class));
 
-            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+                ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-            Object out = actorContext.executeOperation(actor, "hello");
+                Object out = actorContext.executeOperation(actor, "hello");
 
-            assertEquals("hello", out);
-        }};
+                assertEquals("hello", out);
+            }
+        };
     }
 
     @Test
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public void testExecuteRemoteOperationAsync() {
-        new JavaTestKit(getSystem()) {{
-            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+        new JavaTestKit(getSystem()) {
+            {
+                ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-            ActorRef shardManagerActorRef = getSystem()
-                    .actorOf(MockShardManager.props(true, shardActorRef));
+                ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
 
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                        mock(ClusterWrapper.class), mock(Configuration.class));
 
-            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+                ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-            Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+                Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
 
-            try {
-                Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
-                assertEquals("Result", "hello", result);
-            } catch(Exception e) {
-                throw new AssertionError(e);
+                try {
+                    Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+                    assertEquals("Result", "hello", result);
+                } catch (Exception e) {
+                    throw new AssertionError(e);
+                }
             }
-        }};
+        };
     }
 
     @Test
@@ -253,13 +256,13 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
 
         // self address of remote format,but Tx path local format.
-        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
         assertEquals(true, actorContext.isPathLocal(
             "akka://system/user/shardmanager/shard/transaction"));
 
         // self address of local format,but Tx path remote format.
-        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+        clusterWrapper.setSelfAddress(new Address("akka", "system"));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
         assertEquals(false, actorContext.isPathLocal(
             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
@@ -270,209 +273,160 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
 
         //ip and port same
-        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+        assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
 
         // forward-slash missing in address
-        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+        assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
 
         //ips differ
-        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+        assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
 
         //ports differ
-        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
-    }
-
-    @Test
-    public void testResolvePathForRemoteActor() {
-        ActorContext actorContext =
-                new ActorContext(getSystem(), mock(ActorRef.class), mock(
-                        ClusterWrapper.class),
-                        mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-                "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
-                "akka://system/user/shardmanager/shard/transaction");
-
-        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testResolvePathForLocalActor() {
-        ActorContext actorContext =
-                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-                "akka://system/user/shardmanager/shard",
-                "akka://system/user/shardmanager/shard/transaction");
-
-        String expected = "akka://system/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testResolvePathForRemoteActorWithProperRemoteAddress() {
-        ActorContext actorContext =
-                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
-                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
-
-        String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
+        assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
     }
 
-
     @Test
-    public void testClientDispatcherIsGlobalDispatcher(){
-        ActorContext actorContext =
-                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
+    public void testClientDispatcherIsGlobalDispatcher() {
+        ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
 
         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
-
     }
 
     @Test
-    public void testClientDispatcherIsNotGlobalDispatcher(){
-        ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+    public void testClientDispatcherIsNotGlobalDispatcher() {
+        ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
+                ConfigFactory.load("application-with-custom-dispatchers.conf"));
 
-        ActorContext actorContext =
-                new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
+        ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+                mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
 
         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
 
-        actorSystem.shutdown();
-
+        actorSystem.terminate();
     }
 
     @Test
     public void testSetDatastoreContext() {
-        new JavaTestKit(getSystem()) {{
-            ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
-                            mock(Configuration.class), DatastoreContext.newBuilder().
-                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
+        new JavaTestKit(getSystem()) {
+            {
+                ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                        mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
+                                .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
+                        new PrimaryShardInfoFutureCache());
 
-            assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
-            assertEquals("getTransactionCommitOperationTimeout", 7,
-                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+                assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+                assertEquals("getTransactionCommitOperationTimeout", 7,
+                        actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
 
-            DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
-                    shardTransactionCommitTimeoutInSeconds(8).build();
+                DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
+                        .shardTransactionCommitTimeoutInSeconds(8).build();
 
-            DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
-            Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
+                DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+                Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
 
-            actorContext.setDatastoreContext(mockContextFactory);
+                actorContext.setDatastoreContext(mockContextFactory);
 
-            expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
+                expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
 
-            Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+                Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
 
-            assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
-            assertEquals("getTransactionCommitOperationTimeout", 8,
-                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
-        }};
+                assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+                assertEquals("getTransactionCommitOperationTimeout", 8,
+                        actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+            }
+        };
     }
 
     @Test
     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
 
-            TestActorRef<MessageCollectorActor> shardManager =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+        ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
 
-            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+                .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+                .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
-            final String expPrimaryPath = "akka://test-system/find-primary-shard";
-            final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
-                        @Override
-                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
-                        }
-                    };
+        final String expPrimaryPath = "akka://test-system/find-primary-shard";
+        final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
+        ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+            @Override
+            protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+                return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
+            }
+        };
 
-            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+        Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+        PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
-            assertNotNull(actual);
-            assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
-            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
-                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
-            assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
+        assertNotNull(actual);
+        assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+        assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+        assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
 
-            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+        Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+        PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
 
-            assertEquals(cachedInfo, actual);
+        assertEquals(cachedInfo, actual);
 
-            actorContext.getPrimaryShardInfoCache().remove("foobar");
+        actorContext.getPrimaryShardInfoCache().remove("foobar");
 
-            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+        cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            assertNull(cached);
+        assertNull(cached);
     }
 
     @Test
     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
 
-            TestActorRef<MessageCollectorActor> shardManager =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+        ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
 
-            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+                .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+                .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
-            final DataTree mockDataTree = Mockito.mock(DataTree.class);
-            final String expPrimaryPath = "akka://test-system/find-primary-shard";
-            ActorContext actorContext =
-                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
-                            mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
-                        @Override
-                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
-                        }
-                    };
+        final DataTree mockDataTree = Mockito.mock(DataTree.class);
+        final String expPrimaryPath = "akka://test-system/find-primary-shard";
+        ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+            @Override
+            protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+                return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+            }
+        };
 
-            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+        Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+        PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
-            assertNotNull(actual);
-            assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
-            assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
-            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
-                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
-            assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
+        assertNotNull(actual);
+        assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+        assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+        assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+        assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
 
-            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+        Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+        PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
 
-            assertEquals(cachedInfo, actual);
+        assertEquals(cachedInfo, actual);
 
-            actorContext.getPrimaryShardInfoCache().remove("foobar");
+        actorContext.getPrimaryShardInfoCache().remove("foobar");
 
-            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+        cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            assertNull(cached);
+        assertNull(cached);
     }
 
     @Test
@@ -485,18 +439,19 @@ public class ActorContextTest extends AbstractActorTest{
         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
     }
 
-    private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
-        TestActorRef<MessageCollectorActor> shardManager =
-            TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+        ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-            shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+                .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+                .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
         ActorContext actorContext =
             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                 @Override
-                protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
                     return Futures.successful(expectedException);
                 }
             };
@@ -506,8 +461,8 @@ public class ActorContextTest extends AbstractActorTest{
         try {
             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
             fail("Expected" + expectedException.getClass().toString());
-        } catch(Exception e){
-            if(!expectedException.getClass().isInstance(e)) {
+        } catch (Exception e) {
+            if (!expectedException.getClass().isInstance(e)) {
                 fail("Expected Exception of type " + expectedException.getClass().toString());
             }
         }
@@ -519,48 +474,34 @@ public class ActorContextTest extends AbstractActorTest{
 
     @Test
     public void testBroadcast() {
-        new JavaTestKit(getSystem()) {{
-            ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-            ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
-            TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
-            MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
-                    DataStoreVersions.CURRENT_VERSION));
-            shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
-                    DataStoreVersions.CURRENT_VERSION));
-            shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
-
-            Configuration mockConfig = mock(Configuration.class);
-            doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
-                    when(mockConfig).getAllShardNames();
-
-            ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
-                    mock(ClusterWrapper.class), mockConfig,
-                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
-
-            actorContext.broadcast(new TestMessage());
-
-            expectFirstMatching(shardActorRef1, TestMessage.class);
-            expectFirstMatching(shardActorRef2, TestMessage.class);
-        }};
-    }
-
-    private static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
-        int count = 5000 / 50;
-        for(int i = 0; i < count; i++) {
-            try {
-                @SuppressWarnings("unchecked")
-                T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
-                if(message != null) {
-                    return message;
-                }
-            } catch (Exception e) {}
-
-            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-        }
-
-        Assert.fail("Did not receive message of type " + clazz);
-        return null;
+        new JavaTestKit(getSystem()) {
+            {
+                ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
+                ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
+
+                TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
+                        MockShardManager.props());
+                MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+                shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
+                        shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
+                shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
+                        shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
+                shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+                Configuration mockConfig = mock(Configuration.class);
+                doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
+                        .getAllShardNames();
+
+                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                        mock(ClusterWrapper.class), mockConfig,
+                        DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
+                        new PrimaryShardInfoFutureCache());
+
+                actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
+
+                MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+                MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
+            }
+        };
     }
 }