Bug -3221 : Adding a new DataStoreUnavailableException for external applications.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
index 5f6973f5dbff3cb23d798c6740c1ff6aeae77850..1ffe387a199c3988e2ce214f31b9121167cfc79c 100644 (file)
@@ -424,6 +424,169 @@ public class ShardManagerTest extends AbstractActorTest {
         JavaTestKit.shutdownActorSystem(system2);
     }
 
+    @Test
+    public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForReachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+    @Test
+    public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+            newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+            newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager1.tell(new ActorInitialized(), mockShardActor1);
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
+                Optional.of(mock(DataTree.class))), mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))),
+                mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForUnreachableMember();
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+            shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))),
+                mockShardActor1);
+            shardManager1.tell(new RoleChangeNotification(memberId1,
+                RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
+
+            shardManager1.tell(new FindPrimary("default", true), getRef());
+
+            LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+            String path1 = found1.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
+
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -927,6 +1090,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
+        private CountDownLatch memberReachableReceived = new CountDownLatch(1);
         private final ActorRef shardActor;
         private final String name;
 
@@ -955,6 +1120,16 @@ public class ShardManagerTest extends AbstractActorTest {
                     if(!getCluster().getCurrentMemberName().equals(role)) {
                         memberRemovedReceived.countDown();
                     }
+                } else if(message instanceof ClusterEvent.UnreachableMember) {
+                    String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUnreachableReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.ReachableMember) {
+                    String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberReachableReceived.countDown();
+                    }
                 }
             }
         }
@@ -981,6 +1156,19 @@ public class ShardManagerTest extends AbstractActorTest {
             memberRemovedReceived = new CountDownLatch(1);
         }
 
+        void waitForUnreachableMember() {
+            assertEquals("UnreachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
+                ));
+            memberUnreachableReceived = new CountDownLatch(1);
+        }
+
+        void waitForReachableMember() {
+            assertEquals("ReachableMember received", true,
+                Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
+            memberReachableReceived = new CountDownLatch(1);
+        }
+
         void verifyFindPrimary() {
             assertEquals("FindPrimary received", true,
                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));