BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 4f4162f607b76c60a22914b74c651626747b61a1..f097c19e512a3766a3d836c5cc7e061862b29666 100644 (file)
@@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
@@ -41,7 +40,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -96,6 +94,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -121,10 +120,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -191,8 +190,7 @@ public class ShardTest extends AbstractShardTest {
                     // this will cause all other messages to not be queued properly after that.
                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
                     // it does do a persist)
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -229,7 +227,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testRegisterChangeListenerWhenNotLeaderInitially");
 
             // Write initial data into the in-memory store.
@@ -310,8 +308,8 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                    return new Shard(Shard.builder().id(shardID).datastoreContext(
+                            dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -340,7 +338,7 @@ public class ShardTest extends AbstractShardTest {
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    Props.create(new DelegatingShardCreator(creator)),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
@@ -423,12 +421,13 @@ public class ShardTest extends AbstractShardTest {
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
                 TestShard() {
-                    super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
-                            newDatastoreContext(), SCHEMA_CONTEXT);
+                    super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
+                            peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
+                            schemaContext(SCHEMA_CONTEXT));
                 }
 
-                Map<String, String> getPeerAddresses() {
-                    return getRaftActorContext().getPeerAddresses();
+                String getPeerAddress(String id) {
+                    return getRaftActorContext().getPeerAddress(id);
                 }
 
                 @Override
@@ -449,15 +448,14 @@ public class ShardTest extends AbstractShardTest {
                         }
                     })), "testPeerAddressResolved");
 
-            //waitUntilLeader(shard);
             assertEquals("Recovery complete", true,
                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             final String address = "akka://foobar";
             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
-            assertEquals("getPeerAddresses", address,
-                ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+            assertEquals("getPeerAddress", address,
+                ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -473,7 +471,7 @@ public class ShardTest extends AbstractShardTest {
 
         testkit.waitUntilLeader(shard);
 
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
 
         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
@@ -544,7 +542,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
-        final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -1101,8 +1099,7 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         protected boolean isLeader() {
                             return overrideLeaderCalls.get() ? false : super.isLeader();
@@ -1184,7 +1181,7 @@ public class ShardTest extends AbstractShardTest {
 
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification = dataStore.newModification();
 
             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@@ -1217,7 +1214,7 @@ public class ShardTest extends AbstractShardTest {
 
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
-            final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification = dataStore.newModification();
 
             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
@@ -2087,7 +2084,7 @@ public class ShardTest extends AbstractShardTest {
             // Ready the third Tx.
 
             final String transactionID3 = "tx3";
-            final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+            final DataTreeModification modification3 = dataStore.newModification();
             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
                     .apply(modification3);
                 modification3.ready();
@@ -2202,8 +2199,7 @@ public class ShardTest extends AbstractShardTest {
             final Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             super.onReceiveCommand(message);
@@ -2298,9 +2294,8 @@ public class ShardTest extends AbstractShardTest {
         new ShardTestKit(getSystem()) {{
             class TestShard extends Shard {
 
-                protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
-                                    final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-                    super(name, peerAddresses, datastoreContext, schemaContext);
+                protected TestShard(AbstractBuilder<?, ?> builder) {
+                    super(builder);
                     setPersistence(new TestPersistentDataProvider(super.persistence()));
                 }
 
@@ -2322,8 +2317,7 @@ public class ShardTest extends AbstractShardTest {
             final Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT);
+                    return new TestShard(newShardBuilder());
                 }
             };
 
@@ -2331,7 +2325,6 @@ public class ShardTest extends AbstractShardTest {
                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
 
             waitUntilLeader(shard);
-
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
@@ -2339,35 +2332,35 @@ public class ShardTest extends AbstractShardTest {
             // Trigger creation of a snapshot by ensuring
             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
-
-            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
-
-            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
-                    savedSnapshot.get() instanceof Snapshot);
-
-            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
-
-            latch.set(new CountDownLatch(1));
-            savedSnapshot.set(null);
+            awaitAndValidateSnapshot(expectedRoot);
 
             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
+            awaitAndValidateSnapshot(expectedRoot);
 
-            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
 
-            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
-                    savedSnapshot.get() instanceof Snapshot);
+            private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
+                                              ) throws InterruptedException {
+                System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
+                assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
-            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+                assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                        savedSnapshot.get() instanceof Snapshot);
 
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-        }
+                verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
 
-        private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+                latch.set(new CountDownLatch(1));
+                savedSnapshot.set(null);
+            }
 
-            final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
-            assertEquals("Root node", expectedRoot, actual);
+            private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
 
-        }};
+                final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+                assertEquals("Root node", expectedRoot, actual);
+
+           }
+        };
     }
 
     /**
@@ -2377,7 +2370,7 @@ public class ShardTest extends AbstractShardTest {
      */
     @Test
     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
-        final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
 
         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
@@ -2406,14 +2399,14 @@ public class ShardTest extends AbstractShardTest {
         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
-        final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-                persistentContext, SCHEMA_CONTEXT);
+        final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
+                schemaContext(SCHEMA_CONTEXT).props();
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
-        final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
-            nonPersistentContext, SCHEMA_CONTEXT);
+        final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
+                schemaContext(SCHEMA_CONTEXT).props();
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
@@ -2526,15 +2519,16 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
+            dataStoreContextBuilder.persistent(false);
             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
             final Creator<Shard> creator = new Creator<Shard>() {
+                private static final long serialVersionUID = 1L;
                 boolean firstElectionTimeout = true;
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                    return new Shard(newShardBuilder()) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -2563,8 +2557,8 @@ public class ShardTest extends AbstractShardTest {
                 "testDataChangeListenerOnFollower-DataChangeListener");
 
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)),
-                "testDataChangeListenerOnFollower");
+                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
+                    withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
 
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
@@ -2594,6 +2588,7 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testClusteredDataChangeListernerRegistration() throws Exception {
+        dataStoreContextBuilder.persistent(false).build();
         new ShardTestKit(getSystem()) {{
             final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
                 .shardName("inventory").type("config").build();
@@ -2601,12 +2596,13 @@ public class ShardTest extends AbstractShardTest {
             final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
                 .shardName("inventory").type("config").build();
             final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+                private static final long serialVersionUID = 1L;
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
-                        "akka://test/user/" + member2ShardID.toString()),
-                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+                    return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
+                            peerAddresses(Collections.singletonMap(member2ShardID.toString(),
+                                    "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
 
@@ -2619,12 +2615,13 @@ public class ShardTest extends AbstractShardTest {
             };
 
             final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+                private static final long serialVersionUID = 1L;
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
-                        "akka://test/user/" + member1ShardID.toString()),
-                        dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
+                    return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
+                            peerAddresses(Collections.singletonMap(member1ShardID.toString(),
+                                    "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
                 }
             };
 
@@ -2634,7 +2631,7 @@ public class ShardTest extends AbstractShardTest {
                 member1ShardID.toString());
 
             final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(leaderShardCreator)),
+                Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 member2ShardID.toString());
             // Sleep to let election happen
             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
@@ -2662,4 +2659,19 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
+
+    @Test
+    public void testServerRemoved() throws Exception {
+        final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+        final ActorRef shard = parent.underlyingActor().context().actorOf(
+                newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                "testServerRemoved");
+
+        shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+        MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
+
+    }
+
 }