Bug -3221 : Adding a new DataStoreUnavailableException for external applications.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
index 645890dcb9c0f058bddbca5d06c4bd6f6bdcabe7..1ffe387a199c3988e2ce214f31b9121167cfc79c 100644 (file)
@@ -60,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
@@ -93,6 +94,8 @@ public class ShardManagerTest extends AbstractActorTest {
         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
     }
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
@@ -112,9 +115,9 @@ public class ShardManagerTest extends AbstractActorTest {
         InMemoryJournal.clear();
     }
 
-    private Props newShardMgrProps() {
+    private Props newShardMgrProps(boolean persistent) {
         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                datastoreContextBuilder.build(), ready);
+                datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -129,7 +132,7 @@ public class ShardManagerTest extends AbstractActorTest {
             @Override
             public ShardManager create() throws Exception {
                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
-                        ready, name, shardActor);
+                        ready, name, shardActor, primaryShardInfoCache);
             }
         };
 
@@ -421,6 +424,169 @@ public class ShardManagerTest extends AbstractActorTest {
         JavaTestKit.shutdownActorSystem(system2);
     }
 
+    @Test
+    public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForReachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+    @Test
+    public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
+                mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -529,7 +695,7 @@ public class ShardManagerTest extends AbstractActorTest {
             throws Exception {
         new JavaTestKit(getSystem()) {{
             final TestActorRef<ShardManager> shardManager =
-                    TestActorRef.create(getSystem(), newShardMgrProps());
+                    TestActorRef.create(getSystem(), newShardMgrProps(true));
 
             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
 
@@ -564,7 +730,7 @@ public class ShardManagerTest extends AbstractActorTest {
             throws Exception {
         new JavaTestKit(getSystem()) {{
             final TestActorRef<ShardManager> shardManager =
-                    TestActorRef.create(getSystem(), newShardMgrProps());
+                    TestActorRef.create(getSystem(), newShardMgrProps(true));
 
             SchemaContext schemaContext = mock(SchemaContext.class);
             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
@@ -601,10 +767,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRecoveryApplicable(){
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build(), ready);
+                final Props persistentProps = newShardMgrProps(true);
                 final TestActorRef<ShardManager> persistentShardManager =
                         TestActorRef.create(getSystem(), persistentProps);
 
@@ -612,10 +775,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
 
-                final Props nonPersistentProps = ShardManager.props(
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(false).build(), ready);
+                final Props nonPersistentProps = newShardMgrProps(false);
                 final TestActorRef<ShardManager> nonPersistentShardManager =
                         TestActorRef.create(getSystem(), nonPersistentProps);
 
@@ -636,7 +796,8 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
+                        ready, new PrimaryShardInfoFutureCache()) {
                     @Override
                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
@@ -674,7 +835,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -694,7 +855,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -716,7 +877,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
@@ -738,7 +899,7 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
 
                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
@@ -751,10 +912,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testByDefaultSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final Props persistentProps = newShardMgrProps(true);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -768,7 +926,7 @@ public class ShardManagerTest extends AbstractActorTest {
         final Props persistentProps = ShardManager.props(
                 new MockClusterWrapper(),
                 new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -781,10 +939,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = ShardManager.props(
-                new MockClusterWrapper(),
-                new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+        final Props persistentProps = newShardMgrProps(true);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -805,7 +960,7 @@ public class ShardManagerTest extends AbstractActorTest {
         final Props persistentProps = ShardManager.props(
                 new MockClusterWrapper(),
                 new MockConfiguration(),
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -838,7 +993,7 @@ public class ShardManagerTest extends AbstractActorTest {
                         return Arrays.asList("default", "astronauts");
                     }
                 },
-                DatastoreContext.newBuilder().persistent(true).build(), ready);
+                DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -881,7 +1036,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
         TestShardManager(String shardMrgIDSuffix) {
             super(new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
+                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
+                    new PrimaryShardInfoFutureCache());
         }
 
         @Override
@@ -934,13 +1090,15 @@ public class ShardManagerTest extends AbstractActorTest {
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
+        private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
 
         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
-                ActorRef shardActor) {
-            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+                ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
             this.shardActor = shardActor;
             this.name = name;
         }
@@ -962,6 +1120,16 @@ public class ShardManagerTest extends AbstractActorTest {
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberRemovedReceived.countDown();
                     }
+                } else if(message instanceof ClusterEvent.UnreachableMember) {
+                    String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUnreachableReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.ReachableMember) {
+                    String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberReachableReceived.countDown();
+                    }
                 }
             }
         }
@@ -988,6 +1156,19 @@ public class ShardManagerTest extends AbstractActorTest {
             memberRemovedReceived = new CountDownLatch(1);
         }
 
+        void waitForUnreachableMember() {
+            assertEquals("UnreachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
+                ));
+            memberUnreachableReceived = new CountDownLatch(1);
+        }
+
+        void waitForReachableMember() {
+            assertEquals("ReachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
+            memberReachableReceived = new CountDownLatch(1);
+        }
+
         void verifyFindPrimary() {
             assertEquals("FindPrimary received", true,
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));