import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
+ short payloadVersion = (short)5;
+ actorContext.setPayloadVersion(payloadVersion);
long term = 1;
actorContext.getTermInformation().update(term, "");
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
// The follower would normally reply - simulate that explicitly here.
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex - 1, term));
+ FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
assertEquals("Entries size", 1, appendEntries.getEntries().size());
assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
}
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
- MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
- 1, lastIndex + 1, payload);
- actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
// State should not change
assertTrue(raftBehavior instanceof Leader);
assertEquals("Entries size", 1, appendEntries.getEntries().size());
assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
- assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
}
@Test
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
for(int i=0;i<3;i++) {
sendReplicate(actorContext, lastIndex+i+1);
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
}
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
// The follower would normally reply - simulate that explicitly here.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
leader.handleMessage(followerActor, new AppendEntriesReply(
- FOLLOWER_ID, term, true, lastIndex, term));
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
followerActor.underlyingActor().clear();
assertTrue(raftBehavior instanceof Leader);
- MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
}
@Test
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
- assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
@Test
return context;
}
+ private MockRaftActorContext createFollowerActorContextWithLeader() {
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+ followerConfig.setElectionTimeoutFactor(10000);
+ followerActorContext.setConfigParams(followerConfig);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ return followerActorContext;
+ }
+
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
}
@Test
- public void testHandleAppendEntriesReplyFailure(){
- logStart("testHandleAppendEntriesReplyFailure");
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ long leaderCommitIndex = 2;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(0);
+ followerActorContext.setLastApplied(0);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
leader = new Leader(leaderActorContext);
- // Send initial heartbeat reply with last index.
- leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
- assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+ assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+ }
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
- assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
}
@Test
leader = new Leader(leaderActorContext);
- AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
+ short payloadVersion = 5;
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
ApplyState applyState = applyStateList.get(0);
assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals(payloadVersion, followerInfo.getPayloadVersion());
}
@Test
leader = new Leader(leaderActorContext);
- AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
assertEquals(RaftState.Leader, raftActorBehavior.state());
}
+ @Test
+ public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+ logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+ long leaderCommitIndex = 3;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+ ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+ MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+ assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
@Test
public void testHandleRequestVoteReply(){
logStart("testHandleRequestVoteReply");
}};
}
-
- @Test
- public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
- logStart("testAppendEntryCallAtEndofAppendEntryReply");
-
- MockRaftActorContext leaderActorContext = createActorContextWithFollower();
-
- DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
- configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
- leaderActorContext.setConfigParams(configParams);
-
- MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
-
- followerActorContext.setConfigParams(configParams);
- followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
-
- Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().setBehavior(follower);
-
- leaderActorContext.getReplicatedLog().removeFrom(0);
- leaderActorContext.setCommitIndex(-1);
- leaderActorContext.setLastApplied(-1);
-
- followerActorContext.getReplicatedLog().removeFrom(0);
- followerActorContext.setCommitIndex(-1);
- followerActorContext.setLastApplied(-1);
-
- leader = new Leader(leaderActorContext);
-
- AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
- leaderActor, AppendEntriesReply.class);
-
- leader.handleMessage(followerActor, appendEntriesReply);
-
- // Clear initial heartbeat messages
-
- leaderActor.underlyingActor().clear();
- followerActor.underlyingActor().clear();
-
- // create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
- leaderActorContext.setCommitIndex(1);
- leaderActorContext.setLastApplied(1);
-
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
- // Should send first log entry
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(0, appendEntries.getEntries().get(0).getIndex());
- assertEquals(-1, appendEntries.getPrevLogIndex());
-
- appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
-
- assertEquals(1, appendEntriesReply.getLogLastTerm());
- assertEquals(0, appendEntriesReply.getLogLastIndex());
-
- followerActor.underlyingActor().clear();
-
- leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
- appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
- // Should send second log entry
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-
- follower.close();
- }
-
@Test
public void testLaggingFollowerStarvation() throws Exception {
logStart("testLaggingFollowerStarvation");
for(int i=1;i<6;i++) {
// Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
- RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
assertTrue(newBehavior == leader);
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}