BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
index 2072af68d42b30775a68241eba6d05b1a05c2607..3257e8f910e0538ec99bbd0f608daf2b5056e1bd 100644 (file)
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
+import akka.actor.Terminated;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
@@ -104,6 +105,7 @@ import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -331,7 +333,7 @@ public class ShardManagerTest extends AbstractActorTest {
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
-            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
         }};
     }
 
@@ -432,7 +434,7 @@ public class ShardManagerTest extends AbstractActorTest {
             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
-            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
         }};
     }
 
@@ -1449,6 +1451,75 @@ public class ShardManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testServerRemovedShardActorNotRunning() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("default", Arrays.asList("member-1", "member-2")).
+                            put("astronauts", Arrays.asList("member-2")).
+                            put("people", Arrays.asList("member-1", "member-2")).build());
+
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+
+            shardManager.tell(new FindLocalShard("people", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            shardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            // Removed the default shard replica from member-1
+            ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+            final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+            shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+            shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+        }};
+    }
+
+    @Test
+    public void testServerRemovedShardActorRunning() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                            put("default", Arrays.asList("member-1", "member-2")).
+                            put("astronauts", Arrays.asList("member-2")).
+                            put("people", Arrays.asList("member-1", "member-2")).build());
+
+            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newShardMgrProps(mockConfig));
+
+            TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+            watch(shard);
+
+            shardManager.underlyingActor().waitForRecoveryComplete();
+
+            shardManager.underlyingActor().addShardActor("default", shard);
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager.tell(new FindLocalShard("people", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            shardManager.tell(new FindLocalShard("default", false), getRef());
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
+
+            // Removed the default shard replica from member-1
+            ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
+            final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
+            shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
+
+            shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
+
+            expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
+
     @Test
     public void testShardPersistenceWithRestoredData() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -1485,6 +1556,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
     private static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final CountDownLatch snapshotPersist = new CountDownLatch(1);
+        private ShardManagerSnapshot snapshot;
+        private Map<String, ActorRef> shardActors = new HashMap<>();
 
         private TestShardManager(Builder builder) {
             super(builder);
@@ -1523,6 +1597,30 @@ public class ShardManagerTest extends AbstractActorTest {
                 return Props.create(TestShardManager.class, this);
             }
         }
+
+        @Override
+        public void saveSnapshot(Object obj) {
+            snapshot = (ShardManagerSnapshot) obj;
+            snapshotPersist.countDown();
+        }
+
+        void verifySnapshotPersisted(Set<String> shardList) {
+            assertEquals("saveSnapshot invoked", true,
+                    Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
+            assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
+        }
+
+        @Override
+        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            if(shardActors.get(info.getShardName()) != null){
+                return shardActors.get(info.getShardName());
+            }
+            return super.newShardActor(schemaContext, info);
+        }
+
+        public void addShardActor(String shardName, ActorRef actorRef){
+            shardActors.put(shardName, actorRef);
+        }
     }
 
     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {