From 615798c6573f1689068d6da14963112174c0702a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 11 Jun 2019 15:42:34 +0200 Subject: [PATCH] Send commitIndex updates to followers as soon as possible When the commitIndex gets updated when we reach consensus, we should propagate this fact to the sender peer. Otherwise data tree change listeners on peers may be unnecessarily delayed, as they end up waiting for heartbeat interval (or non-batched entries) to be propagated out. The results in observably-better behaviour between leader and follower, as the leader is sending out notifications of consensus, without delayling for heartbeat -- which is reflected in the changes to LeaderTest. Furthermore, DelayedMessagesElectionScenarioTest is detecting the change, as two members may achieve asynchronous consensus -- hence the leader would be sending two messages (request to persist, commitIndex) to push the consensus out. This exposes a race in the test, where we do not know the order between TimeoutNow and AppendEntries. If AppendEntries is processed by member 2 after TimeoutNow, leader will know about the new term due to AppendEntriesReply we send out. The cluster will still converge, but the assert for leader will be different if it manages to process the message. Fix this by isolating member 2. JIRA: CONTROLLER-1900 Change-Id: I695ef25c7a4cf8799c9c5e04c2c33fbf3e2f21df Signed-off-by: Robert Varga (cherry picked from commit c2b0e92d2c3ba05abf0fefb24413c68fd1c56bea) --- .../cluster/raft/FollowerLogInformation.java | 30 ++++++++--- .../raft/behaviors/AbstractLeader.java | 7 +-- .../raft/FollowerLogInformationTest.java | 17 ++++--- .../DelayedMessagesElectionScenarioTest.java | 13 +++-- .../cluster/raft/behaviors/LeaderTest.java | 50 ++++++++++++++----- 5 files changed, 81 insertions(+), 36 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index a5f24990f6..a76d6a29c2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -36,6 +36,8 @@ public final class FollowerLogInformation { private long lastReplicatedIndex = -1L; + private long sentCommitIndex = -1L; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); private short payloadVersion = -1; @@ -236,15 +238,18 @@ public final class FollowerLogInformation { * sending duplicate message too frequently if the last replicate message was sent and no reply has been received * yet within the current heart beat interval * + * @param commitIndex current commitIndex * @return true if it is OK to replicate, false otherwise */ - public boolean okToReplicate() { + public boolean okToReplicate(final long commitIndex) { if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } - // Return false if we are trying to send duplicate data before the heartbeat interval - if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes + // also our commitIndex, as followers need to be told of new commitIndex as soon as possible. + if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex) + && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams().getHeartBeatInterval().toMillis()) { return false; } @@ -345,19 +350,28 @@ public final class FollowerLogInformation { return slicedLogEntryIndex != NO_INDEX; } - public void setNeedsLeaderAddress(boolean value) { + public void setNeedsLeaderAddress(final boolean value) { needsLeaderAddress = value; } - public @Nullable String needsLeaderAddress(String leaderId) { + public @Nullable String needsLeaderAddress(final String leaderId) { return needsLeaderAddress ? context.getPeerAddress(leaderId) : null; } + public boolean hasStaleCommitIndex(final long commitIndex) { + return sentCommitIndex != commitIndex; + } + + public void setSentCommitIndex(final long commitIndex) { + sentCommitIndex = commitIndex; + } + @Override public String toString() { return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex - + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() - + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" - + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex + + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index ec46593503..4ce84bd11f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -677,7 +677,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if (installSnapshotState.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerLogInformation); } - } else if (sendHeartbeat) { + } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; } @@ -698,7 +698,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - if (followerLogInformation.okToReplicate()) { + if (followerLogInformation.okToReplicate(context.getCommitIndex())) { entries = getEntriesToSend(followerLogInformation, followerActor); sendAppendEntries = true; } @@ -726,7 +726,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().size()); } - } else if (sendHeartbeat) { + } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) { // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again sendAppendEntries = true; @@ -837,6 +837,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { appendEntries); } + followerLogInformation.setSentCommitIndex(leaderCommitIndex); followerActor.tell(appendEntries, actor()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java index 8e80e30d8b..9788b8fe8d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationTest.java @@ -56,7 +56,7 @@ public class FollowerLogInformationTest { // we cannot rely comfortably that the sleep will indeed sleep for the desired time // hence getting the actual elapsed time and do a match. // if the sleep has spilled over, then return the test gracefully - private static long sleepWithElaspsedTimeReturned(long millis) { + private static long sleepWithElaspsedTimeReturned(final long millis) { Stopwatch stopwatch = Stopwatch.createStarted(); Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS); stopwatch.stop(); @@ -70,16 +70,17 @@ public class FollowerLogInformationTest { FollowerLogInformation followerLogInformation = new FollowerLogInformation(new PeerInfo("follower1", null, VotingState.VOTING), 10, context); - assertTrue(followerLogInformation.okToReplicate()); - assertFalse(followerLogInformation.okToReplicate()); + followerLogInformation.setSentCommitIndex(0); + assertTrue(followerLogInformation.okToReplicate(0)); + assertFalse(followerLogInformation.okToReplicate(0)); // wait for 150 milliseconds and it should work again Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - assertTrue(followerLogInformation.okToReplicate()); + assertTrue(followerLogInformation.okToReplicate(0)); //increment next index and try immediately and it should work again followerLogInformation.incrNextIndex(); - assertTrue(followerLogInformation.okToReplicate()); + assertTrue(followerLogInformation.okToReplicate(0)); } @Test @@ -89,13 +90,13 @@ public class FollowerLogInformationTest { context.setCommitIndex(0); FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context); - assertFalse(followerLogInformation.okToReplicate()); + assertFalse(followerLogInformation.okToReplicate(0)); followerLogInformation.markFollowerActive(); assertFalse(followerLogInformation.isFollowerActive()); peerInfo.setVotingState(VotingState.VOTING); - assertTrue(followerLogInformation.okToReplicate()); + assertTrue(followerLogInformation.okToReplicate(0)); followerLogInformation.markFollowerActive(); assertTrue(followerLogInformation.isFollowerActive()); @@ -108,7 +109,7 @@ public class FollowerLogInformationTest { context.setCommitIndex(0); FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context); - assertTrue(followerLogInformation.okToReplicate()); + assertTrue(followerLogInformation.okToReplicate(0)); followerLogInformation.markFollowerActive(); assertTrue(followerLogInformation.isFollowerActive()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java index d481e6f149..52985fd3c1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java @@ -15,6 +15,7 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; @@ -148,15 +149,19 @@ public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionS // should switch to Candidate and send out RequestVote messages. Set member 1 and 3 actors // to capture RequestVote but not to forward to the behavior just yet as we want to // control the order of RequestVote messages to member 1 and 3. - - member1Actor.dropMessagesToBehavior(RequestVote.class); - member2Actor.expectBehaviorStateChange(); + // member 1 and member 3 may reach consensus to consider leader's initial Noop entry as committed, hence + // leader would elicit this information to member 2. + // We do not want that, as member 2 would respond to that request either before it bumps or after it bumps its + // term -- if it would see that message post-bump, it would leak term 2 back to member 1, hence leader would + // know about it. + member2Actor.dropMessagesToBehavior(AppendEntries.class); + + member1Actor.dropMessagesToBehavior(RequestVote.class); member3Actor.dropMessagesToBehavior(RequestVote.class); member2ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - member1Actor.waitForExpectedMessages(RequestVote.class); member3Actor.waitForExpectedMessages(RequestVote.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 78ec33cba1..3805576ae1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -27,6 +27,7 @@ import akka.protobuf.ByteString; import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; @@ -163,11 +164,13 @@ public class LeaderTest extends AbstractLeaderTest { return sendReplicate(actorContext, 1, index); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, + final long index) { return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index, + final Payload payload) { SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); @@ -375,22 +378,43 @@ public class LeaderTest extends AbstractLeaderTest { sendReplicate(actorContext, lastIndex + i + 1); leader.handleMessage(followerActor, new AppendEntriesReply( FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); - } - for (int i = 3; i < 5; i++) { - sendReplicate(actorContext, lastIndex + i + 1); + // We are expecting six messages here -- a request to replicate and a consensus-reached message + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request/consensus appends collected", 6, allMessages.size()); + for (int i = 0; i < 3; i++) { + assertRequestEntry(lastIndex, allMessages, i); + assertCommitEntry(lastIndex, allMessages, i); } - List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); - // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would - // get sent to the follower - but not the 5th - assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + // Now perform another commit, eliciting a request to persist + sendReplicate(actorContext, lastIndex + 3 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // This elicits another message for request to replicate + assertEquals("The number of request entries collected", 7, allMessages.size()); + assertRequestEntry(lastIndex, allMessages, 3); - for (int i = 0; i < 4; i++) { - long expected = allMessages.get(i).getEntries().get(0).getIndex(); - assertEquals(expected, i + 2); - } + sendReplicate(actorContext, lastIndex + 4 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request entries collected", 7, allMessages.size()); + } + + private static void assertCommitEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries commitReq = allMessages.get(2 * messageNr + 1); + assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit()); + assertEquals(ImmutableList.of(), commitReq.getEntries()); + } + + private static void assertRequestEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries req = allMessages.get(2 * messageNr); + assertEquals(lastIndex + messageNr, req.getLeaderCommit()); + + final List entries = req.getEntries(); + assertEquals(1, entries.size()); + assertEquals(messageNr + 2, entries.get(0).getIndex()); } @Test -- 2.36.6