From 6e4a2530fbd24f87b47a756bcc281cb825616e2a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 29 May 2015 23:27:33 -0400 Subject: [PATCH] Streamline updating out-of-sync follower The first AppendEntries message a leader sends to a follower contains the leader's entry at the current commit index as it doesn't know the follower's next index yet. If the the previous index isn't present in the follower's log, the follower sends back an unsuccessful reply. The leader then decrements the prior index it just sent and sends the next message. This continue's until the next index gets down to the follower's last index (or -1 if the follower's log is empty). Then it succeeds on the follower and the leader increments the next index and sends the rest. This results in at least twice the number of messages sent to sync the follower. The follower sends back its lastIndex in the unsuccessful responses so it seems to make sense for the leader to use that index instead of decrementing what it thinks might be the next index. This eliminates the excessive AppendEntries messages. However we should only do this is the follower's last index is present in the leader's log and the terms match. Otherwise we fall back to decrementing the index. Change-Id: If49ed48e8c70cb87b06b48a5ed3e6b0bbb6efc36 Signed-off-by: Tom Pantelis --- .../raft/behaviors/AbstractLeader.java | 49 ++-- .../cluster/raft/messages/AppendEntries.java | 2 +- .../cluster/raft/behaviors/LeaderTest.java | 217 +++++++++++++++++- 3 files changed, 243 insertions(+), 25 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index c2d9edd93a..8fb66998c7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -182,23 +182,29 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean updated = false; if (appendEntriesReply.isSuccess()) { - updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); - updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; - - if(updated && LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(), - followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); - } + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about + long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); + ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); + if(followerLastLogIndex < 0 || (followersLastLogEntry != null && + followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { + // The follower's log is empty or the last entry is present in the leader's journal + // and the terms match so the follower is just behind the leader's journal from + // the last snapshot, if any. We'll catch up the follower quickly by starting at the + // follower's last log index. - followerLogInformation.decrNextIndex(); + updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + } else { + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about. + + followerLogInformation.decrNextIndex(); + } } // Now figure out if this reply warrants a change in the commitIndex @@ -244,6 +250,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } + private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, + AppendEntriesReply appendEntriesReply) { + boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if(updated && LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", + logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); + } + return updated; + } + private void purgeInMemoryLog() { //find the lowest index across followers which has been replicated to all. // lastApplied if there are no followers, so that we keep clearing the log for single-node @@ -476,8 +495,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); + LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 6c1d6b47c7..d8075f4381 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -123,7 +123,7 @@ public class AppendEntries extends AbstractRaftRPC { builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex) .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit) .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=") - .append(payloadVersion).append("]"); + .append(payloadVersion).append(", entries=").append(entries).append("]"); return builder.toString(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 9094a65654..27854f40e9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; @@ -973,6 +974,15 @@ public class LeaderTest extends AbstractLeaderTest { return context; } + private MockRaftActorContext createFollowerActorContextWithLeader() { + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); + followerConfig.setElectionTimeoutFactor(10000); + followerActorContext.setConfigParams(followerConfig); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + return followerActorContext; + } + @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -1101,26 +1111,215 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyFailure(){ - logStart("testHandleAppendEntriesReplyFailure"); + public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){ + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + long leaderCommitIndex = 2; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(1); + followerActorContext.setLastApplied(1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); leader = new Leader(leaderActorContext); - // Send initial heartbeat reply with last index. - leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0)); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Verify AppendEntries sent with the leader's second log entry. + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + // Verify AppendEntries sent with the leader's third log entry. + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); - assertEquals("getNextIndex", 11, followerInfo.getNextIndex()); + assertEquals("getNextIndex", 3, followerInfo.getNextIndex()); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0); + assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex()); + } - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() { + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty"); - assertEquals(RaftState.Leader, raftActorBehavior.state()); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); - assertEquals("getNextIndex", 10, followerInfo.getNextIndex()); + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Verify AppendEntries sent with the leader's first log entry. + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + // Verify AppendEntries sent with the leader's second log entry. + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + } + + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){ + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Verify AppendEntries sent with the leader's first log entry. + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + // Verify AppendEntries sent with the leader's third log entry. + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm()); } @Test -- 2.36.6