X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=8006d5a6ac906ed6a11598b59f5c6aba6269f109;hb=8e1d3c4f9001fbc8a5d3d3bea57916c5099078b2;hp=9d64b140fb7d9db2c191eb12ffc847deb5273561;hpb=fe8352361d49c76a0ecc80162a2b8258d35198b5;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 9d64b140fb..8006d5a6ac 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -5,41 +5,51 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; -import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.protobuf.ByteString; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import com.google.protobuf.ByteString; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; +import java.io.OutputStream; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.MockRaftActor; +import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver; +import org.opendaylight.controller.cluster.raft.PeerAddressResolver; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -50,19 +60,27 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; +import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; public class FollowerTest extends AbstractRaftActorBehaviorTest { - private final TestActorRef followerActor = actorFactory.createTestActor( - Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); + private final ActorRef followerActor = actorFactory.createActor( + MessageCollectorActor.props(), actorFactory.generateActorId("follower")); - private final TestActorRef leaderActor = actorFactory.createTestActor( - Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); + private final ActorRef leaderActor = actorFactory.createActor( + MessageCollectorActor.props(), actorFactory.generateActorId("leader")); private Follower follower; @@ -70,7 +88,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override @After - public void tearDown() throws Exception { + public void tearDown() { if (follower != null) { follower.close(); } @@ -79,7 +97,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected Follower createBehavior(RaftActorContext actorContext) { + protected Follower createBehavior(final RaftActorContext actorContext) { return spy(new Follower(actorContext)); } @@ -89,9 +107,11 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef) { + protected MockRaftActorContext createActorContext(final ActorRef actorRef) { MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); - context.setPayloadVersion(payloadVersion ); + context.setPayloadVersion(payloadVersion); + ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver( + peerId -> leaderActor.path().toString()); return context; } @@ -132,7 +152,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { Uninterruptibles.sleepUninterruptibly(context.getConfigParams() .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short) 1)); Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS); @@ -141,7 +161,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { Uninterruptibles.sleepUninterruptibly(context.getConfigParams() .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short) 1)); Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); @@ -188,7 +208,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Test - public void testHandleFirstAppendEntries() throws Exception { + public void testHandleFirstAppendEntries() { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -196,10 +216,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); context.getReplicatedLog().setSnapshotIndex(99); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); - Assert.assertEquals(1, context.getReplicatedLog().size()); + assertEquals(1, context.getReplicatedLog().size()); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); @@ -216,13 +235,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); @@ -239,17 +257,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() - throws Exception { - logStart("testHandleFirstAppendEntries"); + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() { + logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog"); MockRaftActorContext context = createActorContext(); context.getReplicatedLog().clear(0,2); context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar")); context.getReplicatedLog().setSnapshotIndex(99); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); @@ -266,16 +282,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() - throws Exception { - logStart("testHandleFirstAppendEntries"); + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() { + logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot"); MockRaftActorContext context = createActorContext(); context.getReplicatedLog().clear(0,2); context.getReplicatedLog().setSnapshotIndex(100); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); @@ -292,8 +306,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing() - throws Exception { + public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() { logStart( "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing"); @@ -301,8 +314,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.getReplicatedLog().clear(0,2); context.getReplicatedLog().setSnapshotIndex(100); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 105, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 105, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0); @@ -319,13 +331,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleSyncUpAppendEntries() throws Exception { + public void testHandleSyncUpAppendEntries() { logStart("testHandleSyncUpAppendEntries"); MockRaftActorContext context = createActorContext(); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); @@ -339,15 +350,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertFalse(syncStatus.isInitialSyncDone()); // Clear all the messages - followerActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(followerActor); context.setLastApplied(101); context.setCommitIndex(101); - setLastLogEntry(context, 1, 101, - new MockRaftActorContext.MockPayload("")); + setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload("")); - entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); @@ -357,7 +366,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertTrue(syncStatus.isInitialSyncDone()); - followerActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(followerActor); // Sending the same message again should not generate another message @@ -366,17 +375,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); assertNull(syncStatus); - } @Test - public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() { logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); MockRaftActorContext context = createActorContext(); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); @@ -390,14 +397,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertFalse(syncStatus.isInitialSyncDone()); // Clear all the messages - followerActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(followerActor); context.setLastApplied(100); setLastLogEntry(context, 1, 100, new MockRaftActorContext.MockPayload("")); - entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // leader-2 is becoming the leader now and it says the commitIndex is 45 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); @@ -407,18 +413,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // We get a new message saying initial status is not done assertFalse(syncStatus.isInitialSyncDone()); - } - @Test - public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() { logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); MockRaftActorContext context = createActorContext(); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); @@ -432,15 +435,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertFalse(syncStatus.isInitialSyncDone()); // Clear all the messages - followerActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(followerActor); context.setLastApplied(101); context.setCommitIndex(101); setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload("")); - entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); @@ -451,14 +453,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertTrue(syncStatus.isInitialSyncDone()); // Clear all the messages - followerActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(followerActor); context.setLastApplied(100); setLastLogEntry(context, 1, 100, new MockRaftActorContext.MockPayload("")); - entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // leader-2 is becoming the leader now and it says the commitIndex is 45 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); @@ -468,10 +469,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // We get a new message saying initial status is not done assertFalse(syncStatus.isInitialSyncDone()); - } - /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the @@ -479,7 +478,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * sets it current applied state to the commitIndex of the sender. */ @Test - public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { + public void testHandleAppendEntriesWithNewerCommitIndex() { logStart("testHandleAppendEntriesWithNewerCommitIndex"); MockRaftActorContext context = createActorContext(); @@ -489,8 +488,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockPayload("")); context.getReplicatedLog().setSnapshotIndex(99); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); @@ -502,7 +500,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } /** - * This test verifies that when an AppendEntries is received a specific prevLogTerm + * This test verifies that when an AppendEntries is received with a prevLogTerm * which does not match the term that is in RaftActors log entry at prevLogIndex * then the RaftActor does not change it's state and it returns a failure. */ @@ -512,18 +510,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext context = createActorContext(); - // First set the receivers term to lower number - context.getTermInformation().update(95, "test"); - - // AppendEntries is now sent with a bigger term - // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); + AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0); follower = createBehavior(context); RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - Assert.assertSame(follower, newBehavior); + assertSame(follower, newBehavior); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); @@ -531,6 +524,28 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("isSuccess", false, reply.isSuccess()); } + @Test + public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() { + logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot"); + + MockRaftActorContext context = createActorContext(); + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build()); + context.getReplicatedLog().setSnapshotIndex(4); + context.getReplicatedLog().setSnapshotTerm(3); + + AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + assertSame(follower, newBehavior); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertEquals("isSuccess", true, reply.isSuccess()); + } + /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver match that the new @@ -555,9 +570,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setReplicatedLog(log); // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add(newReplicatedLogEntry(1, 3, "three")); - entries.add(newReplicatedLogEntry(1, 4, "four")); + List entries = List.of( + newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four")); // Send appendEntries with the same term as was set on the receiver // before the new behavior was created (1 in this case) @@ -571,7 +585,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - Assert.assertSame(follower, newBehavior); + assertSame(follower, newBehavior); assertEquals("Next index", 5, log.last().getIndex() + 1); assertEquals("Entry 3", entries.get(0), log.get(3)); @@ -607,9 +621,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setReplicatedLog(log); // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add(newReplicatedLogEntry(2, 2, "two-1")); - entries.add(newReplicatedLogEntry(2, 3, "three")); + List entries = List.of( + newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three")); // Send appendEntries with the same term as was set on the receiver // before the new behavior was created (1 in this case) @@ -621,7 +634,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - Assert.assertSame(follower, newBehavior); + assertSame(follower, newBehavior); // The entry at index 2 will be found out-of-sync with the leader // and will be removed @@ -658,9 +671,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setReplicatedLog(log); // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add(newReplicatedLogEntry(2, 2, "two-1")); - entries.add(newReplicatedLogEntry(2, 3, "three")); + List entries = List.of( + newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three")); // Send appendEntries with the same term as was set on the receiver // before the new behavior was created (1 in this case) @@ -673,7 +685,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - Assert.assertSame(follower, newBehavior); + assertSame(follower, newBehavior); expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true); } @@ -693,8 +705,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setReplicatedLog(log); // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add(newReplicatedLogEntry(1, 4, "four")); + List entries = List.of(newReplicatedLogEntry(1, 4, "four")); AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0); @@ -702,7 +713,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - Assert.assertSame(follower, newBehavior); + assertSame(follower, newBehavior); expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2); } @@ -723,7 +734,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setReplicatedLog(log); // Send the last entry again. - List entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one")); + List entries = List.of(newReplicatedLogEntry(1, 1, "one")); follower = createBehavior(context); @@ -736,9 +747,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // Send the last entry again and also a new one. - entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); + entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); - leaderActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(leaderActor); follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0)); assertEquals("Next index", 3, log.last().getIndex() + 1); @@ -764,8 +775,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setReplicatedLog(log); // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add(newReplicatedLogEntry(1, 4, "four")); + List entries = List.of(newReplicatedLogEntry(1, 4, "four")); AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0); @@ -773,18 +783,17 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - Assert.assertSame(follower, newBehavior); + assertSame(follower, newBehavior); expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } - /** * This test verifies that when InstallSnapshot is received by * the follower its applied correctly. */ @Test - public void testHandleInstallSnapshot() throws Exception { + public void testHandleInstallSnapshot() { logStart("testHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -792,7 +801,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - ByteString bsSnapshot = createSnapshot(); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -821,7 +830,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); - Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getState type", ByteState.class, snapshot.getState().getClass()); + assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes()); assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); applySnapshot.getCallback().onSuccess(); @@ -841,13 +851,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); } - /** * Verify that when an AppendEntries is sent to a follower during a snapshot install * the Follower short-circuits the processing of the AppendEntries message. */ @Test - public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + public void testReceivingAppendEntriesDuringInstallSnapshot() { logStart("testReceivingAppendEntriesDuringInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -876,7 +885,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // Send an append entry AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1, - Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); + List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); follower.handleMessage(leaderActor, appendEntries); @@ -890,7 +899,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception { + public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() { logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader"); MockRaftActorContext context = createActorContext(); @@ -919,7 +928,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // Send appendEntries with a new term and leader. AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1, - Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); + List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); follower.handleMessage(leaderActor, appendEntries); @@ -933,7 +942,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() { logStart("testInitialSyncUpWithHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -966,15 +975,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertFalse(syncStatus.isInitialSyncDone()); // Clear all the messages - followerActor.underlyingActor().clear(); + MessageCollectorActor.clearMessages(followerActor); context.setLastApplied(101); context.setCommitIndex(101); setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload("")); - List entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + List entries = List.of(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0); @@ -986,7 +994,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleOutOfSequenceInstallSnapshot() throws Exception { + public void testHandleOutOfSequenceInstallSnapshot() { logStart("testHandleOutOfSequenceInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -1050,7 +1058,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Test public void testFollowerSchedulesElectionIfNonVoting() { MockRaftActorContext context = createActorContext(); - context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false)))); + context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false)))); ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval( FiniteDuration.apply(100, TimeUnit.MILLISECONDS)); ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1); @@ -1087,16 +1095,260 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } - public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) { + @Test + public void testCaptureSnapshotOnLastEntryInAppendEntries() { + String id = "testCaptureSnapshotOnLastEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = List.of( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 1 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex()); + assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()), + MockRaftActor.fromState(snapshot.getState())); + } + + @Test + public void testCaptureSnapshotOnMiddleEntryInAppendEntries() { + String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = List.of( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 2 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState())); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex()); + + // Reinstate the actor from persistence + + actorFactory.killActor(followerActorRef, new TestKit(getSystem())); + + followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex()); + assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied()); + assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex()); + assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), followerRaftActor.get().getState()); + } + + @Test + public void testCaptureSnapshotOnAppendEntriesWithUnapplied() { + String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(1); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = List.of( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 0 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", List.of(entries.get(0).getData()), + MockRaftActor.fromState(snapshot.getState())); + } + + @Test + public void testNeedsLeaderAddress() { + logStart("testNeedsLeaderAddress"); + + MockRaftActorContext context = createActorContext(); + context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog()); + context.addToPeers("leader", null, VotingState.VOTING); + ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE); + + follower = createBehavior(context); + + follower.handleMessage(leaderActor, + new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0)); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertTrue(reply.isNeedsLeaderAddress()); + MessageCollectorActor.clearMessages(leaderActor); + + PeerAddressResolver mockResolver = mock(PeerAddressResolver.class); + ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver); + + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, + (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString())); + + reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertFalse(reply.isNeedsLeaderAddress()); + + verify(mockResolver).setResolved("leader", leaderActor.path().toString()); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private static RaftActorSnapshotCohort newRaftActorSnapshotCohort( + final AtomicReference followerRaftActor) { + RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { + @Override + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { + try { + actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()), + installSnapshotStream), actorRef); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void applySnapshot(final State snapshotState) { + } + + @Override + public State deserializeSnapshot(final ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); + } + }; + return snapshotCohort; + } + + public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) { int snapshotLength = bs.size(); int start = offset; int size = chunkSize; if (chunkSize > snapshotLength) { size = snapshotLength; - } else { - if (start + chunkSize > snapshotLength) { - size = snapshotLength - start; - } + } else if (start + chunkSize > snapshotLength) { + size = snapshotLength - start; } byte[] nextChunk = new byte[size]; @@ -1104,14 +1356,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { return nextChunk; } - private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, - String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess, + final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) { expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); } - private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, - String expFollowerId, long expLogLastTerm, long expLogLastIndex, - boolean expForceInstallSnapshot) { + private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess, + final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex, + final boolean expForceInstallSnapshot) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); @@ -1123,26 +1375,22 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); + assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress()); } - private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) { return new SimpleReplicatedLogEntry(index, term, new MockRaftActorContext.MockPayload(data)); } private ByteString createSnapshot() { - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - return toByteString(followerSnapshot); + return toByteString(Map.of("1", "A", "2", "B", "3", "C")); } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null; @@ -1150,8 +1398,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) - throws Exception { + protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); assertEquals("isSuccess", true, reply.isSuccess()); }