+ @Test
+ public void testLaggingFollowerStarvation() throws Exception {
+ logStart("testLaggingFollowerStarvation");
+ new JavaTestKit(getSystem()) {{
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
+
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ for(int i=1;i<6;i++) {
+ // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
+ assertTrue(newBehavior == leader);
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+
+ // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+ List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+ assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+ heartbeats.size() > 1);
+
+ // Check if follower-2 got AppendEntries during this time and was not starved
+ List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+ assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+ appendEntries.size() > 1);
+
+ }};
+ }
+
+ @Override
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ ActorRef actorRef, RaftRPC rpc) throws Exception {
+ super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+ assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
+ }
+