Remove support for pre-Fluorine versions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index ae58652b20fce8d683a63996bf7a513d8f0d1b64..c3f3b3296bc6456b8b21e4126c2fe05399c2f689 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
@@ -23,20 +22,19 @@ import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
+import akka.protobuf.ByteString;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.SerializationUtils;
@@ -65,6 +63,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
@@ -73,7 +72,6 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -104,7 +102,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleMessageForUnknownMessage() throws Exception {
+    public void testHandleMessageForUnknownMessage() {
         logStart("testHandleMessageForUnknownMessage");
 
         leader = new Leader(createActorContext());
@@ -114,7 +112,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -163,18 +161,20 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         return sendReplicate(actorContext, 1, index);
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+            final long index) {
         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
     }
 
-    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
+    private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+            final Payload payload) {
         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
     }
 
     @Test
-    public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+    public void testHandleReplicateMessageSendAppendEntriesToFollower() {
         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -211,7 +211,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -266,7 +266,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+    public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -304,7 +304,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -344,7 +344,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -375,26 +375,47 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
             sendReplicate(actorContext, lastIndex + i + 1);
             leader.handleMessage(followerActor, new AppendEntriesReply(
                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
         }
 
-        for (int i = 3; i < 5; i++) {
-            sendReplicate(actorContext, lastIndex + i + 1);
+        // We are expecting six messages here -- a request to replicate and a consensus-reached message
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+        for (int i = 0; i < 3; i++) {
+            assertRequestEntry(lastIndex, allMessages, i);
+            assertCommitEntry(lastIndex, allMessages, i);
         }
 
-        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
-        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
-        // get sent to the follower - but not the 5th
-        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+        // Now perform another commit, eliciting a request to persist
+        sendReplicate(actorContext, lastIndex + 3 + 1);
+        allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // This elicits another message for request to replicate
+        assertEquals("The number of request entries collected", 7, allMessages.size());
+        assertRequestEntry(lastIndex, allMessages, 3);
 
-        for (int i = 0; i < 4; i++) {
-            long expected = allMessages.get(i).getEntries().get(0).getIndex();
-            assertEquals(expected, i + 2);
-        }
+        sendReplicate(actorContext, lastIndex + 4 + 1);
+        allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of request entries collected", 7, allMessages.size());
+    }
+
+    private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+            final int messageNr) {
+        final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+        assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+        assertEquals(List.of(), commitReq.getEntries());
+    }
+
+    private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+            final int messageNr) {
+        final AppendEntries req = allMessages.get(2 * messageNr);
+        assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+        final List<ReplicatedLogEntry> entries = req.getEntries();
+        assertEquals(1, entries.size());
+        assertEquals(messageNr + 2, entries.get(0).getIndex());
     }
 
     @Test
-    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -439,7 +460,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -476,7 +497,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -516,7 +537,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
 
     @Test
-    public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+    public void testHandleReplicateMessageWhenThereAreNoFollowers() {
         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
 
         MockRaftActorContext actorContext = createActorContext();
@@ -564,11 +585,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         final MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        Map<String, String> leadersSnapshot = new HashMap<>();
-        leadersSnapshot.put("1", "A");
-        leadersSnapshot.put("2", "B");
-        leadersSnapshot.put("3", "C");
-
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
@@ -591,9 +607,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
-                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+        ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
+        leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
@@ -624,7 +640,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testSendAppendEntriesSnapshotScenario() throws Exception {
+    public void testSendAppendEntriesSnapshotScenario() {
         logStart("testSendAppendEntriesSnapshotScenario");
 
         final MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -673,7 +689,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testInitiateInstallSnapshot() throws Exception {
+    public void testInitiateInstallSnapshot() {
         logStart("testInitiateInstallSnapshot");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -699,7 +715,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(null);
+        leader.setSnapshotHolder(null);
 
         // new entry
         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
@@ -747,7 +763,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.getReplicatedLog().removeFrom(0);
 
-        AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+        AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
 
         leader = new Leader(actorContext);
@@ -757,7 +773,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(null);
+        leader.setSnapshotHolder(null);
 
         for (int i = 0; i < 4; i++) {
             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
@@ -775,7 +791,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -795,25 +812,27 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
         final byte[] bytes = new byte[]{1, 2, 3};
-        installSnapshotStream.get().get().write(bytes);
+        installSnapshotStream.get().orElseThrow().write(bytes);
         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
                 Runtime.getRuntime().totalMemory());
         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
         MessageCollectorActor.clearMessages(followerActor);
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }
 
 
     @Test
-    public void testInstallSnapshot() throws Exception {
+    public void testInstallSnapshot() {
         logStart("testInstallSnapshot");
 
         final MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -847,7 +866,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
-        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
@@ -868,7 +887,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testForceInstallSnapshot() throws Exception {
+    public void testForceInstallSnapshot() {
         logStart("testForceInstallSnapshot");
 
         final MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -899,7 +918,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
 
         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
-        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
@@ -953,8 +972,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
-                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+        leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
@@ -983,7 +1002,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+    public void testSendSnapshotfromInstallSnapshotReply() {
         logStart("testSendSnapshotfromInstallSnapshotReply");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -1023,8 +1042,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         ByteString bs = toByteString(leadersSnapshot);
         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
-                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-                -1, null, null);
+                List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
 
         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
@@ -1061,7 +1079,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -1097,8 +1115,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         ByteString bs = toByteString(leadersSnapshot);
         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
-                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-                -1, null, null);
+                List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
@@ -1126,7 +1143,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
@@ -1162,8 +1179,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         ByteString bs = toByteString(leadersSnapshot);
         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
-                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-                -1, null, null);
+                List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
 
         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
@@ -1172,8 +1188,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
-                installSnapshot.getLastChunkHashCode().get().intValue());
+        assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
+                installSnapshot.getLastChunkHashCode());
 
         final int hashCode = Arrays.hashCode(installSnapshot.getData());
 
@@ -1186,7 +1202,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertEquals(2, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
+        assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
     }
 
     @Test
@@ -1256,8 +1272,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
     private MockRaftActorContext createActorContextWithFollower() {
         MockRaftActorContext actorContext = createActorContext();
-        actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
-                followerActor.path().toString()).build());
+        actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
         return actorContext;
     }
 
@@ -1266,12 +1281,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
         followerConfig.setElectionTimeoutFactor(10000);
         followerActorContext.setConfigParams(followerConfig);
-        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+        followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
         return followerActorContext;
     }
 
     @Test
-    public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+    public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
 
         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
@@ -1326,13 +1341,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+    public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
 
         final MockRaftActorContext leaderActorContext = createActorContext();
 
         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
-        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+        followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
@@ -1696,7 +1711,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testHandleAppendEntriesReplySuccess() throws Exception {
+    public void testHandleAppendEntriesReplySuccess() {
         logStart("testHandleAppendEntriesReplySuccess");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
@@ -1713,7 +1728,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
 
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
-        assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+        assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
 
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
 
@@ -1767,7 +1782,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
-        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+        // Note: the size here depends on estimate
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
 
         leaderActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
@@ -1916,7 +1932,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
 
         // kill 1 follower and verify if that got killed
-        final JavaTestKit probe = new JavaTestKit(getSystem());
+        final TestKit probe = new TestKit(getSystem());
         probe.watch(followerActor1);
         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
@@ -1940,7 +1956,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+    public void testIsolatedLeaderCheckTwoFollowers() {
         logStart("testIsolatedLeaderCheckTwoFollowers");
 
         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
@@ -1950,7 +1966,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+    public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
 
         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
@@ -1960,7 +1976,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testLaggingFollowerStarvation() throws Exception {
+    public void testLaggingFollowerStarvation() {
         logStart("testLaggingFollowerStarvation");
 
         String leaderActorId = actorFactory.generateActorId("leader");
@@ -2029,8 +2045,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leaderActorContext.setLastApplied(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
-        TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
-                Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+        ActorRef nonVotingFollowerActor = actorFactory.createActor(
+                MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
 
         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
                 VotingState.NON_VOTING);
@@ -2112,7 +2128,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
-        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+        doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
 
@@ -2143,7 +2159,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
-        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+        doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2175,7 +2191,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
-        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+        doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2242,7 +2258,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
 
         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
-                Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+                List.of(new SimpleReplicatedLogEntry(0, 1,
                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
         final MockRaftActorContext.MockPayload largePayload =
                 new MockRaftActorContext.MockPayload("large", serializedSize);
@@ -2376,6 +2392,59 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
     }
 
+    @Test
+    public void testLeaderAddressInAppendEntries() {
+        logStart("testLeaderAddressInAppendEntries");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+            peerId -> leaderActor.path().toString());
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Initial heartbeat shouldn't have the leader address
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower needs the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+                RaftVersions.CURRENT_VERSION));
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertTrue(appendEntries.getLeaderAddress().isPresent());
+        assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().orElseThrow());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower does not need the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+                RaftVersions.CURRENT_VERSION));
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
             final ActorRef actorRef, final RaftRPC rpc) {
@@ -2383,13 +2452,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
 
-    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+    private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
 
         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
-            super();
             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
             this.snapshotChunkSize = snapshotChunkSize;
         }