Send commitIndex updates to followers as soon as possible 98/82498/1
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 11 Jun 2019 13:42:34 +0000 (15:42 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 12 Jun 2019 08:18:59 +0000 (08:18 +0000)
When the commitIndex gets updated when we reach consensus, we should
propagate this fact to the sender peer. Otherwise data tree change
listeners on peers may be unnecessarily delayed, as they end up
waiting for heartbeat interval (or non-batched entries) to be propagated
out.

The results in observably-better behaviour between leader and follower,
as the leader is sending out notifications of consensus, without
delayling for heartbeat -- which is reflected in the changes to
LeaderTest.

Furthermore, DelayedMessagesElectionScenarioTest is detecting the change,
as two members may achieve asynchronous consensus -- hence the leader
would be sending two messages (request to persist, commitIndex) to push
the consensus out. This exposes a race in the test, where we do not know
the order between TimeoutNow and AppendEntries. If AppendEntries is
processed by member 2 after TimeoutNow, leader will know about the new
term due to AppendEntriesReply we send out. The cluster will still
converge, but the assert for leader will be different if it manages to
process the message. Fix this by isolating member 2.

JIRA: CONTROLLER-1900
Change-Id: I695ef25c7a4cf8799c9c5e04c2c33fbf3e2f21df
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c2b0e92d2c3ba05abf0fefb24413c68fd1c56bea)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index a5f24990f6a54b4542bcc67dee35f2cdd4c56467..a76d6a29c272db22c34ff66c23769384305f19e9 100644 (file)
@@ -36,6 +36,8 @@ public final class FollowerLogInformation {
 
     private long lastReplicatedIndex = -1L;
 
+    private long sentCommitIndex = -1L;
+
     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
     private short payloadVersion = -1;
@@ -236,15 +238,18 @@ public final class FollowerLogInformation {
      * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
      * yet within the current heart beat interval
      *
+     * @param commitIndex current commitIndex
      * @return true if it is OK to replicate, false otherwise
      */
-    public boolean okToReplicate() {
+    public boolean okToReplicate(final long commitIndex) {
         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
-        // Return false if we are trying to send duplicate data before the heartbeat interval
-        if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+        // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+        // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+        if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+                && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
                 < context.getConfigParams().getHeartBeatInterval().toMillis()) {
             return false;
         }
@@ -345,19 +350,28 @@ public final class FollowerLogInformation {
         return slicedLogEntryIndex != NO_INDEX;
     }
 
-    public void setNeedsLeaderAddress(boolean value) {
+    public void setNeedsLeaderAddress(final boolean value) {
         needsLeaderAddress = value;
     }
 
-    public @Nullable String needsLeaderAddress(String leaderId) {
+    public @Nullable String needsLeaderAddress(final String leaderId) {
         return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
     }
 
+    public boolean hasStaleCommitIndex(final long commitIndex) {
+        return sentCommitIndex != commitIndex;
+    }
+
+    public void setSentCommitIndex(final long commitIndex) {
+        sentCommitIndex = commitIndex;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
-                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
-                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
-                + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+                + ", votingState=" + peerInfo.getVotingState()
+                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+                + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
     }
 }
index ec465935039eb99a0c34341e05aa93bd2340e34d..4ce84bd11f8ed8edccc4d8699a5202850dbef5be 100644 (file)
@@ -677,7 +677,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     } else if (installSnapshotState.canSendNextChunk()) {
                         sendSnapshotChunk(followerActor, followerLogInformation);
                     }
-                } else if (sendHeartbeat) {
+                } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
                 }
@@ -698,7 +698,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
                             followerNextIndex, followerId);
 
-                    if (followerLogInformation.okToReplicate()) {
+                    if (followerLogInformation.okToReplicate(context.getCommitIndex())) {
                         entries = getEntriesToSend(followerLogInformation, followerActor);
                         sendAppendEntries = true;
                     }
@@ -726,7 +726,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                                 context.getReplicatedLog().size());
                     }
 
-                } else if (sendHeartbeat) {
+                } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
                     // we send an AppendEntries, even if the follower is inactive
                     // in-order to update the followers timestamp, in case it becomes active again
                     sendAppendEntries = true;
@@ -837,6 +837,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     appendEntries);
         }
 
+        followerLogInformation.setSentCommitIndex(leaderCommitIndex);
         followerActor.tell(appendEntries, actor());
     }
 
index 8e80e30d8bee736e6aad87cd3992c2065085d932..9788b8fe8d427670c12b8a9619872a4c3b6a10ca 100644 (file)
@@ -56,7 +56,7 @@ public class FollowerLogInformationTest {
     // we cannot rely comfortably that the sleep will indeed sleep for the desired time
     // hence getting the actual elapsed time and do a match.
     // if the sleep has spilled over, then return the test gracefully
-    private static long sleepWithElaspsedTimeReturned(long millis) {
+    private static long sleepWithElaspsedTimeReturned(final long millis) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS);
         stopwatch.stop();
@@ -70,16 +70,17 @@ public class FollowerLogInformationTest {
         FollowerLogInformation followerLogInformation =
                 new FollowerLogInformation(new PeerInfo("follower1", null, VotingState.VOTING), 10, context);
 
-        assertTrue(followerLogInformation.okToReplicate());
-        assertFalse(followerLogInformation.okToReplicate());
+        followerLogInformation.setSentCommitIndex(0);
+        assertTrue(followerLogInformation.okToReplicate(0));
+        assertFalse(followerLogInformation.okToReplicate(0));
 
         // wait for 150 milliseconds and it should work again
         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
-        assertTrue(followerLogInformation.okToReplicate());
+        assertTrue(followerLogInformation.okToReplicate(0));
 
         //increment next index and try immediately and it should work again
         followerLogInformation.incrNextIndex();
-        assertTrue(followerLogInformation.okToReplicate());
+        assertTrue(followerLogInformation.okToReplicate(0));
     }
 
     @Test
@@ -89,13 +90,13 @@ public class FollowerLogInformationTest {
         context.setCommitIndex(0);
         FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context);
 
-        assertFalse(followerLogInformation.okToReplicate());
+        assertFalse(followerLogInformation.okToReplicate(0));
 
         followerLogInformation.markFollowerActive();
         assertFalse(followerLogInformation.isFollowerActive());
 
         peerInfo.setVotingState(VotingState.VOTING);
-        assertTrue(followerLogInformation.okToReplicate());
+        assertTrue(followerLogInformation.okToReplicate(0));
 
         followerLogInformation.markFollowerActive();
         assertTrue(followerLogInformation.isFollowerActive());
@@ -108,7 +109,7 @@ public class FollowerLogInformationTest {
         context.setCommitIndex(0);
         FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context);
 
-        assertTrue(followerLogInformation.okToReplicate());
+        assertTrue(followerLogInformation.okToReplicate(0));
 
         followerLogInformation.markFollowerActive();
         assertTrue(followerLogInformation.isFollowerActive());
index d481e6f1491ddaf82e11d7f8b000dab5b3d301b4..52985fd3c134ef82cdf3637c4b966a10b961049c 100644 (file)
@@ -15,6 +15,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
@@ -148,15 +149,19 @@ public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionS
         // should switch to Candidate and send out RequestVote messages. Set member 1 and 3 actors
         // to capture RequestVote but not to forward to the behavior just yet as we want to
         // control the order of RequestVote messages to member 1 and 3.
-
-        member1Actor.dropMessagesToBehavior(RequestVote.class);
-
         member2Actor.expectBehaviorStateChange();
 
+        // member 1 and member 3 may reach consensus to consider leader's initial Noop entry as committed, hence
+        // leader would elicit this information to member 2.
+        // We do not want that, as member 2 would respond to that request either before it bumps or after it bumps its
+        // term -- if it would see that message post-bump, it would leak term 2 back to member 1, hence leader would
+        // know about it.
+        member2Actor.dropMessagesToBehavior(AppendEntries.class);
+
+        member1Actor.dropMessagesToBehavior(RequestVote.class);
         member3Actor.dropMessagesToBehavior(RequestVote.class);
 
         member2ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
-
         member1Actor.waitForExpectedMessages(RequestVote.class);
         member3Actor.waitForExpectedMessages(RequestVote.class);
 
index 78ec33cba12deca93e9be5746eb7ce682301f2ac..3805576ae19b15b40d991427c4a2a4bdf9b3599b 100644 (file)
@@ -27,6 +27,7 @@ import akka.protobuf.ByteString;
 import akka.testkit.TestActorRef;
 import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -163,11 +164,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return sendReplicate(actorContext, 1, index);
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+            final long index) {
         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+            final Payload payload) {
         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
@@ -375,22 +378,43 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
             sendReplicate(actorContext, lastIndex + i + 1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
         }
 
-        for (int i = 3; i < 5; i++) {
-            sendReplicate(actorContext, lastIndex + i + 1);
+        // We are expecting six messages here -- a request to replicate and a consensus-reached message
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+        for (int i = 0; i < 3; i++) {
+            assertRequestEntry(lastIndex, allMessages, i);
+            assertCommitEntry(lastIndex, allMessages, i);
         }
 
-        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
-        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
-        // get sent to the follower - but not the 5th
-        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+        // Now perform another commit, eliciting a request to persist
+        sendReplicate(actorContext, lastIndex + 3 + 1);
+        allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // This elicits another message for request to replicate
+        assertEquals("The number of request entries collected", 7, allMessages.size());
+        assertRequestEntry(lastIndex, allMessages, 3);
 
-        for (int i = 0; i < 4; i++) {
-            long expected = allMessages.get(i).getEntries().get(0).getIndex();
-            assertEquals(expected, i + 2);
-        }
+        sendReplicate(actorContext, lastIndex + 4 + 1);
+        allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of request entries collected", 7, allMessages.size());
+    }
+
+    private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+            final int messageNr) {
+        final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+        assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+        assertEquals(ImmutableList.of(), commitReq.getEntries());
+    }
+
+    private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+            final int messageNr) {
+        final AppendEntries req = allMessages.get(2 * messageNr);
+        assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+        final List<ReplicatedLogEntry> entries = req.getEntries();
+        assertEquals(1, entries.size());
+        assertEquals(messageNr + 2, entries.get(0).getIndex());
     }
 
     @Test