boolean updated = false;
if (appendEntriesReply.isSuccess()) {
- updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
- updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
-
- if(updated && LOG.isDebugEnabled()) {
- LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(),
- followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
- }
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about
+ long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+ ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+ if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+ followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+ // The follower's log is empty or the last entry is present in the leader's journal
+ // and the terms match so the follower is just behind the leader's journal from
+ // the last snapshot, if any. We'll catch up the follower quickly by starting at the
+ // follower's last log index.
- followerLogInformation.decrNextIndex();
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ } else {
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about.
+
+ followerLogInformation.decrNextIndex();
+ }
}
// Now figure out if this reply warrants a change in the commitIndex
return this;
}
+ private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
+ AppendEntriesReply appendEntriesReply) {
+ boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
+ updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
+
+ if(updated && LOG.isDebugEnabled()) {
+ LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
+ logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
+ }
+ return updated;
+ }
+
private void purgeInMemoryLog() {
//find the lowest index across followers which has been replicated to all.
// lastApplied if there are no followers, so that we keep clearing the log for single-node
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
- LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
- logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
}
if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
return context;
}
+ private MockRaftActorContext createFollowerActorContextWithLeader() {
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+ followerConfig.setElectionTimeoutFactor(10000);
+ followerActorContext.setConfigParams(followerConfig);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ return followerActorContext;
+ }
+
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
}
@Test
- public void testHandleAppendEntriesReplyFailure(){
- logStart("testHandleAppendEntriesReplyFailure");
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ long leaderCommitIndex = 2;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(1);
+ followerActorContext.setLastApplied(1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
leader = new Leader(leaderActorContext);
- // Send initial heartbeat reply with last index.
- leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Verify AppendEntries sent with the leader's second log entry.
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+ // Verify AppendEntries sent with the leader's third log entry.
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
- assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+ assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
+ assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+ }
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
- assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Verify AppendEntries sent with the leader's first log entry.
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+ // Verify AppendEntries sent with the leader's second log entry.
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Verify AppendEntries sent with the leader's first log entry.
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+ // Verify AppendEntries sent with the leader's third log entry.
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
}
@Test