Fix issues when persistence enabled
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupportTest.java
index 816e5b217093d1b85a9efd48b2c5b12965f9041d..cba4dd9cdf9c627e9aad499f1ed67b80cecf08b6 100644 (file)
@@ -34,14 +34,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -58,6 +56,10 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -206,20 +208,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
         assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
 
-        List<ReplicatedLogImplEntry> persistedLogEntries = InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class);
-        assertEquals("Leader ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
-        ReplicatedLogImplEntry logEntry = persistedLogEntries.get(0);
-        assertEquals("Leader ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
-        assertEquals("Leader ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
-        assertEquals("Leader ReplicatedLogImplEntry getData", ServerConfigurationPayload.class, logEntry.getData().getClass());
-
-        persistedLogEntries = InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class);
-        assertEquals("New follower ReplicatedLogImplEntry entries", 1, persistedLogEntries.size());
-        logEntry = persistedLogEntries.get(0);
-        assertEquals("New follower ReplicatedLogImplEntry getTerm", 1, logEntry.getTerm());
-        assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
-        assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
-                logEntry.getData().getClass());
+        assertEquals("Leader persisted ReplicatedLogImplEntry entries", 0,
+                InMemoryJournal.get(LEADER_ID, ReplicatedLogImplEntry.class).size());
+        assertEquals("Leader persisted ServerConfigurationPayload entries", 1,
+                InMemoryJournal.get(LEADER_ID, ServerConfigurationPayload.class).size());
+
+        assertEquals("New follower persisted ReplicatedLogImplEntry entries", 0,
+                InMemoryJournal.get(NEW_SERVER_ID, ReplicatedLogImplEntry.class).size());
+        assertEquals("New follower persisted ServerConfigurationPayload entries", 1,
+                InMemoryJournal.get(NEW_SERVER_ID, ServerConfigurationPayload.class).size());
 
         LOG.info("testAddServerWithExistingFollower ending");
     }
@@ -373,7 +370,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         followerActor.underlyingActor().setBehavior(newFollower2);
 
         MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
-        newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+        newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.class);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
@@ -604,7 +601,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
         // Drop the InstallSnapshot message so it times out
-        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.class);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
@@ -1105,6 +1102,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(5);
 
         final String node1ID = "node1";
         final String node2ID = "node2";
@@ -1120,8 +1118,10 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
         InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node1ID, 3, new ApplyJournalEntries(0));
         InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 3, new ApplyJournalEntries(0));
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1168,6 +1168,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
 
+        // Send an AppendEntries so node1 has a leaderId
+
+        MessageCollectorActor.clearMessages(node1Collector);
+
+        long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
+        node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
+
+        // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
+        // ChangeServersVotingStatus message, it will try to elect a leader.
+
+        MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
+
         // Update node2's peer address and send the message again
 
         node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
@@ -1282,6 +1295,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
                 new MockRaftActorContext.MockPayload("2")));
+        InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1376,7 +1390,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
 
-        node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+        node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
 
         ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());