+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+ logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ short payloadVersion = (short)5;
+ actorContext.setPayloadVersion(payloadVersion);
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader should send an immediate heartbeat with no entries as follower is inactive.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getTerm", term, appendEntries.getTerm());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
+
+ // The follower would normally reply - simulate that explicitly here.
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+ getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
+ }
+