import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.TestActorFactory;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-public class LeaderTest extends AbstractRaftActorBehaviorTest {
+public class LeaderTest extends AbstractLeaderTest {
static final String FOLLOWER_ID = "follower";
- private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-
private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
private Leader leader;
+ @Override
@After
public void tearDown() throws Exception {
if(leader != null) {
leader.close();
}
- actorFactory.close();
+ super.tearDown();
}
@Test
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, index, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+ }
+
@Test
public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
1, lastIndex + 1, payload);
actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(null, null, newEntry));
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
// State should not change
assertTrue(raftBehavior instanceof Leader);
assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
+ @Test
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<5;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect only 1 message to be sent because of two reasons,
+ // - an append entries reply was not received
+ // - the heartbeat interval has not expired
+ // In this scenario if multiple messages are sent they would likely be duplicates
+ assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+ }
+
+ @Test
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+ logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+
+ }
+
+ for(int i=3;i<5;i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+ // get sent to the follower - but not the 5th
+ assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+ for(int i=0;i<4;i++) {
+ long expected = allMessages.get(i).getEntries().get(0).getIndex();
+ assertEquals(expected, i+2);
+ }
+ }
+
+ @Test
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+ logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ sendReplicate(actorContext, lastIndex+1);
+
+ // Wait slightly longer than heartbeat duration
+ Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(1, allMessages.get(0).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+ }
+
+ @Test
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+ logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+ }
+
+ @Test
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ sendReplicate(actorContext, lastIndex+1);
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(0, allMessages.get(0).getEntries().size());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ }
+
+
@Test
public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
leader.handleMessage(leaderActor, new SendHeartBeat());
- InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
- InstallSnapshot.SERIALIZABLE_CLASS);
-
- InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+ InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(snapshotIndex, is.getLastIncludedIndex());
}
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
+ actorContext.getReplicatedLog().append(entry);
+
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
// check if installsnapshot gets called with the correct values.
- InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
- MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertNotNull(installSnapshot.getData());
assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
- // FIXME - we don't set the term in the serialized message.
- //assertEquals(currentTerm, installSnapshot.getTerm());
+ assertEquals(currentTerm, installSnapshot.getTerm());
}
@Test
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
// Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
followerActor.underlyingActor().clear();
leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- installSnapshot = MessageCollectorActor.getFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
Assert.assertNull(installSnapshot);
}
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Optional.of(bs));
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
leader.handleMessage(leaderActor, new SendHeartBeat());
- installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
int hashCode = installSnapshot.getData().hashCode();
leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
FOLLOWER_ID, 1, true));
- installSnapshot = MessageCollectorActor.expectFirstMatching(
- followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
}
@Test
private MockRaftActorContext createActorContextWithFollower() {
MockRaftActorContext actorContext = createActorContext();
- actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+ actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
followerActor.path().toString()).build());
return actorContext;
}
return context;
}
- public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- AbstractRaftActorBehavior behavior;
-
- @Override public void onReceive(Object message) throws Exception {
- if(behavior != null) {
- behavior.handleMessage(sender(), message);
- }
-
- super.onReceive(message);
- }
-
- public static Props props() {
- return Props.create(ForwardMessageToBehaviorActor.class);
- }
- }
-
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ followerActor.underlyingActor().setBehavior(follower);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ followerActor.underlyingActor().setBehavior(follower);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
- leaderActor.underlyingActor().behavior = leader;
+ leaderActor.underlyingActor().setBehavior(follower);
leader.handleMessage(followerActor, appendEntriesReply);
leaderActor.underlyingActor().clear();
assertEquals(2, leaderActorContext.getCommitIndex());
- ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
- leaderActor, ApplyLogEntries.class);
+ ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+ leaderActor, ApplyJournalEntries.class);
assertEquals(2, leaderActorContext.getLastApplied());
- assertEquals(2, applyLogEntries.getToIndex());
+ assertEquals(2, applyJournalEntries.getToIndex());
List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
ApplyState.class);
leaderActorContext.setPeerAddresses(peerAddresses);
leader = new Leader(leaderActorContext);
- leader.stopIsolatedLeaderCheckSchedule();
leader.markFollowerActive("follower-1");
leader.markFollowerActive("follower-2");
followerActorContext.setConfigParams(configParams);
Follower follower = new Follower(followerActorContext);
- followerActor.underlyingActor().behavior = follower;
+ followerActor.underlyingActor().setBehavior(follower);
leaderActorContext.getReplicatedLog().removeFrom(0);
leaderActorContext.setCommitIndex(-1);
follower.close();
}
+ @Test
+ public void testLaggingFollowerStarvation() throws Exception {
+ logStart("testLaggingFollowerStarvation");
+ new JavaTestKit(getSystem()) {{
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
+
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ for(int i=1;i<6;i++) {
+ // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+ assertTrue(newBehavior == leader);
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+
+ // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+ List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+ assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+ heartbeats.size() > 1);
+
+ // Check if follower-2 got AppendEntries during this time and was not starved
+ List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+ assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+ appendEntries.size() > 1);
+
+ }};
+ }
+
+ @Override
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ ActorRef actorRef, RaftRPC rpc) throws Exception {
+ super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+ assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
+ }
+
private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
private final long electionTimeOutIntervalMillis;