Take snapshot after recovery on migrated messages
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
index 514346f1ab30334417b3f40c360759db1d5a7a3a..3145950b80ed5ca5af4b4cac2bd110d83d9fa407 100644 (file)
@@ -34,15 +34,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -59,6 +56,10 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -80,6 +81,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     static final String NEW_SERVER_ID = "new-server";
     static final String NEW_SERVER_ID2 = "new-server2";
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
+    private static final Class<?> COMMIT_MESSAGE_CLASS = RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.getClass();
     private static final boolean NO_PERSISTENCE = false;
     private static final boolean PERSISTENT = true;
 
@@ -373,7 +375,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         followerActor.underlyingActor().setBehavior(newFollower2);
 
         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
-        newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+        newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
@@ -436,7 +438,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
 
-        String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+        Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
@@ -479,7 +481,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
         // Drop commit message so the snapshot doesn't complete.
-        leaderRaftActor.setDropMessageOfType(String.class);
+        leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
 
         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
 
@@ -512,13 +514,13 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
 
         // Drop the commit message so the snapshot doesn't complete yet.
-        leaderRaftActor.setDropMessageOfType(String.class);
+        leaderRaftActor.setDropMessageOfType(COMMIT_MESSAGE_CLASS);
 
         leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
-        String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+        Object commitMsg = expectFirstMatching(leaderCollectorActor, COMMIT_MESSAGE_CLASS);
 
         // Change the leader behavior to follower
         leaderActor.tell(new Follower(leaderActorContext), leaderActor);
@@ -604,7 +606,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
         // Drop the InstallSnapshot message so it times out
-        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
@@ -1084,12 +1086,28 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         LOG.info("testChangeLeaderToNonVoting ending");
     }
 
+    @Test
+    public void testChangeLeaderToNonVotingInSingleNode() {
+        LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
+                        withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+
+        leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
+
+        LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
+    }
+
     @Test
     public void testChangeToVotingWithNoLeader() {
         LOG.info("testChangeToVotingWithNoLeader starting");
 
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(5);
 
         final String node1ID = "node1";
         final String node2ID = "node2";
@@ -1105,8 +1123,10 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1124,8 +1144,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
 
-        // Wait for snapshot after recovery
-        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+        node1RaftActor.waitForInitializeBehaviorComplete();
+        node2RaftActor.waitForInitializeBehaviorComplete();
 
         // Verify the intended server config was loaded and applied.
         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
@@ -1135,7 +1155,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
         assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
 
-        MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
+        verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+                nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
+                votingServer("downNode2"));
         assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
 
         // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
@@ -1151,6 +1173,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
 
+        // Send an AppendEntries so node1 has a leaderId
+
+        MessageCollectorActor.clearMessages(node1Collector);
+
+        long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
+        node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
+
+        // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
+        // ChangeServersVotingStatus message, it will try to elect a leader.
+
+        MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
+
         // Update node2's peer address and send the message again
 
         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
@@ -1185,13 +1220,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         final String node1ID = "node1";
         final String node2ID = "node2";
 
-        PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
-            @Override
-            public String resolve(String peerId) {
-                return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
-                    peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
-            }
-        };
+        PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+            peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
 
         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
@@ -1225,9 +1255,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
 
-        // Wait for snapshot after recovery
-        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
-
         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
         // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
@@ -1255,13 +1282,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         final String node1ID = "node1";
         final String node2ID = "node2";
 
-        PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
-            @Override
-            public String resolve(String peerId) {
-                return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
-                    peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
-            }
-        };
+        PeerAddressResolver peerAddressResolver = peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+            peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
 
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
@@ -1278,6 +1300,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
                 new MockRaftActorContext.MockPayload("2")));
+        InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1295,9 +1318,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
 
-        // Wait for snapshot after recovery
-        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
-
         // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
         // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
@@ -1334,13 +1354,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         final String node1ID = "node1";
         final String node2ID = "node2";
 
-        configParams.setPeerAddressResolver(new PeerAddressResolver() {
-            @Override
-            public String resolve(String peerId) {
-                return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
-                    peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
-            }
-        });
+        configParams.setPeerAddressResolver(peerId -> peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+            peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null);
 
         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
                 new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
@@ -1367,9 +1382,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                         PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
         CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
 
-        // Wait for snapshot after recovery
-        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
-
         // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
         // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
         // RequestVote messages in node2 and make it the leader so node1 should forward the server change
@@ -1383,7 +1395,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
 
-        node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+        node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
 
         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
@@ -1402,7 +1414,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
     }
 
-    private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
+    private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
             for(RaftActor raftActor: raftActors) {