Send commitIndex updates to followers as soon as possible
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 982ac7bc38fe5238ab676a7fb211bdcf0e5dfb7f..a3afff3a8193cea74a267912c1a4bbef93c082d1 100644 (file)
@@ -26,6 +26,7 @@ import akka.actor.Terminated;
 import akka.testkit.TestActorRef;
 import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -163,11 +164,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return sendReplicate(actorContext, 1, index);
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+            final long index) {
         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+            final Payload payload) {
         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
@@ -375,22 +378,43 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
             sendReplicate(actorContext, lastIndex + i + 1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
         }
 
-        for (int i = 3; i < 5; i++) {
-            sendReplicate(actorContext, lastIndex + i + 1);
+        // We are expecting six messages here -- a request to replicate and a consensus-reached message
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+        for (int i = 0; i < 3; i++) {
+            assertRequestEntry(lastIndex, allMessages, i);
+            assertCommitEntry(lastIndex, allMessages, i);
         }
 
-        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
-        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
-        // get sent to the follower - but not the 5th
-        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+        // Now perform another commit, eliciting a request to persist
+        sendReplicate(actorContext, lastIndex + 3 + 1);
+        allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // This elicits another message for request to replicate
+        assertEquals("The number of request entries collected", 7, allMessages.size());
+        assertRequestEntry(lastIndex, allMessages, 3);
 
-        for (int i = 0; i < 4; i++) {
-            long expected = allMessages.get(i).getEntries().get(0).getIndex();
-            assertEquals(expected, i + 2);
-        }
+        sendReplicate(actorContext, lastIndex + 4 + 1);
+        allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of request entries collected", 7, allMessages.size());
+    }
+
+    private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+            final int messageNr) {
+        final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+        assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+        assertEquals(ImmutableList.of(), commitReq.getEntries());
+    }
+
+    private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+            final int messageNr) {
+        final AppendEntries req = allMessages.get(2 * messageNr);
+        assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+        final List<ReplicatedLogEntry> entries = req.getEntries();
+        assertEquals(1, entries.size());
+        assertEquals(messageNr + 2, entries.get(0).getIndex());
     }
 
     @Test
@@ -775,7 +799,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -795,7 +820,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
@@ -807,7 +833,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
         MessageCollectorActor.clearMessages(followerActor);
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }
 
@@ -2376,6 +2403,59 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
     }
 
+    @Test
+    public void testLeaderAddressInAppendEntries() {
+        logStart("testLeaderAddressInAppendEntries");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+            peerId -> leaderActor.path().toString());
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Initial heartbeat shouldn't have the leader address
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower needs the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+                RaftVersions.CURRENT_VERSION));
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertTrue(appendEntries.getLeaderAddress().isPresent());
+        assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower does not need the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+                RaftVersions.CURRENT_VERSION));
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
             final ActorRef actorRef, final RaftRPC rpc) {