+ @Test
+ public void testLeaderAddressInAppendEntries() {
+ logStart("testLeaderAddressInAppendEntries");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat shouldn't have the leader address
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertNull(appendEntries.leaderAddress());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower needs the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+ RaftVersions.CURRENT_VERSION));
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower does not need the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+ RaftVersions.CURRENT_VERSION));
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertNull(appendEntries.leaderAddress());
+ }
+