X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=3e6c7590c0b80cea60bb50148e015bc35c1d3207;hp=b8be7be2ae2aff014b3150359a4fbe387014a47c;hb=b5cb353e3553a39f576c284119af75ffa5ea66a9;hpb=a57559cab8f0dd9204fe4848c85d96659115b63b diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index b8be7be2ae..3e6c7590c0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -15,34 +15,46 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; + import akka.actor.ActorRef; import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.MockRaftActor; +import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; +import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; -import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -50,6 +62,17 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; +import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; @@ -67,8 +90,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override @After - public void tearDown() throws Exception { - if(follower != null) { + public void tearDown() { + if (follower != null) { follower.close(); } @@ -76,7 +99,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected Follower createBehavior(RaftActorContext actorContext) { + protected Follower createBehavior(final RaftActorContext actorContext) { return spy(new Follower(actorContext)); } @@ -86,34 +109,68 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef){ + protected MockRaftActorContext createActorContext(final ActorRef actorRef) { MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); - context.setPayloadVersion(payloadVersion ); + context.setPayloadVersion(payloadVersion); return context; } @Test - public void testThatAnElectionTimeoutIsTriggered(){ + public void testThatAnElectionTimeoutIsTriggered() { MockRaftActorContext actorContext = createActorContext(); follower = new Follower(actorContext); - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class, + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class, actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis()); } @Test - public void testHandleElectionTimeout(){ - logStart("testHandleElectionTimeout"); + public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived"); - follower = new Follower(createActorContext()); + MockRaftActorContext context = createActorContext(); + follower = new Follower(context); - RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE); + Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(), + TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); assertTrue(raftBehavior instanceof Candidate); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + public void testHandleElectionTimeoutWhenLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenLeaderMessageReceived"); + + MockRaftActorContext context = createActorContext(); + ((DefaultConfigParamsImpl) context.getConfigParams()) + .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4); + + follower = new Follower(context); + context.setCurrentBehavior(follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams() + .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams() + .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() { logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); MockRaftActorContext context = createActorContext(); @@ -132,7 +189,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() { logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); MockRaftActorContext context = createActorContext(); @@ -151,7 +208,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Test - public void testHandleFirstAppendEntries() throws Exception { + public void testHandleFirstAppendEntries() { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -170,7 +227,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -178,7 +236,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -192,7 +250,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -200,7 +259,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -217,7 +276,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -225,7 +285,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -241,7 +301,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -249,8 +310,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { - logStart("testHandleFirstAppendEntries"); + public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() { + logStart( + "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing"); MockRaftActorContext context = createActorContext(); context.getReplicatedLog().clear(0,2); @@ -265,7 +327,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -273,7 +336,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleSyncUpAppendEntries() throws Exception { + public void testHandleSyncUpAppendEntries() { logStart("testHandleSyncUpAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -287,7 +350,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -296,11 +360,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setLastApplied(101); context.setCommitIndex(101); - setLastLogEntry(context, 1, 101, - new MockRaftActorContext.MockPayload("")); + setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload("")); - entries = Arrays.asList( - newReplicatedLogEntry(2, 101, "foo")); + entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); @@ -319,11 +381,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); assertNull(syncStatus); - } @Test - public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() { logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); MockRaftActorContext context = createActorContext(); @@ -337,7 +398,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -359,12 +421,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // We get a new message saying initial status is not done assertFalse(syncStatus.isInitialSyncDone()); - } - @Test - public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() { logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); MockRaftActorContext context = createActorContext(); @@ -378,7 +438,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -419,20 +480,16 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // We get a new message saying initial status is not done assertFalse(syncStatus.isInitialSyncDone()); - } - /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the * state machine of the RaftActor, the RaftActor applies the state and * sets it current applied state to the commitIndex of the sender. - * - * @throws Exception */ @Test - public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { + public void testHandleAppendEntriesWithNewerCommitIndex() { logStart("testHandleAppendEntriesWithNewerCommitIndex"); MockRaftActorContext context = createActorContext(); @@ -458,8 +515,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * This test verifies that when an AppendEntries is received a specific prevLogTerm * which does not match the term that is in RaftActors log entry at prevLogIndex * then the RaftActor does not change it's state and it returns a failure. - * - * @throws Exception */ @Test public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() { @@ -472,7 +527,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1, + (short)0); follower = createBehavior(context); @@ -490,9 +546,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver match that the new * entries get added to the log and the log is incremented by the number of - * entries received in appendEntries - * - * @throws Exception + * entries received in appendEntries. */ @Test public void testHandleAppendEntriesAddNewEntries() { @@ -544,7 +598,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver are out-of-sync that * the log is first corrected by removing the out of sync entries from the - * log and then adding in the new entries sent with the AppendEntries message + * log and then adding in the new entries sent with the AppendEntries message. */ @Test public void testHandleAppendEntriesCorrectReceiverLogEntries() { @@ -636,10 +690,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleAppendEntriesPreviousLogEntryMissing(){ + public void testHandleAppendEntriesPreviousLogEntryMissing() { logStart("testHandleAppendEntriesPreviousLogEntryMissing"); - MockRaftActorContext context = createActorContext(); + final MockRaftActorContext context = createActorContext(); // Prepare the receivers log MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); @@ -706,7 +760,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleAppendEntriesAfterInstallingSnapshot(){ + public void testHandleAppendEntriesAfterInstallingSnapshot() { logStart("testHandleAppendAfterInstallingSnapshot"); MockRaftActorContext context = createActorContext(); @@ -739,11 +793,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { /** * This test verifies that when InstallSnapshot is received by * the follower its applied correctly. - * - * @throws Exception */ @Test - public void testHandleInstallSnapshot() throws Exception { + public void testHandleInstallSnapshot() { logStart("testHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -751,16 +803,16 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - ByteString bsSnapshot = createSnapshot(); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; - for(int i = 0; i < totalChunks; i++) { + for (int i = 0; i < totalChunks; i++) { byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, chunkData, chunkIndex, totalChunks); @@ -780,7 +832,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); - Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getState type", ByteState.class, snapshot.getState().getClass()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes()); assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); applySnapshot.getCallback().onSuccess(); @@ -790,7 +843,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("InstallSnapshotReply count", totalChunks, replies.size()); chunkIndex = 1; - for(InstallSnapshotReply reply: replies) { + for (InstallSnapshotReply reply: replies) { assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex()); assertEquals("getTerm", 1, reply.getTerm()); assertEquals("isSuccess", true, reply.isSuccess()); @@ -800,15 +853,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); } - /** * Verify that when an AppendEntries is sent to a follower during a snapshot install * the Follower short-circuits the processing of the AppendEntries message. - * - * @throws Exception */ @Test - public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + public void testReceivingAppendEntriesDuringInstallSnapshot() { logStart("testReceivingAppendEntriesDuringInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -818,7 +868,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ByteString bsSnapshot = createSnapshot(); int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; // Check that snapshot installation is not in progress @@ -836,26 +886,69 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNotNull(follower.getSnapshotTracker()); // Send an append entry - AppendEntries appendEntries = mock(AppendEntries.class); - doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); follower.handleMessage(leaderActor, appendEntries); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); - assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); - assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm()); - // We should not hit the code that needs to look at prevLogIndex because we are short circuiting - verify(appendEntries, never()).getPrevLogIndex(); + assertNotNull(follower.getSnapshotTracker()); + } + + @Test + public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() { + logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send appendEntries with a new term and leader. + AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", 2, reply.getLogLastIndex()); + assertEquals("getLogLastTerm", 2, reply.getLogLastTerm()); + assertEquals("getTerm", 2, reply.getTerm()); + + assertNull(follower.getSnapshotTracker()); } @Test - public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() { logStart("testInitialSyncUpWithHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); + context.setCommitIndex(-1); follower = createBehavior(context); @@ -863,12 +956,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; - for(int i = 0; i < totalChunks; i++) { + for (int i = 0; i < totalChunks; i++) { byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, chunkData, chunkIndex, totalChunks); @@ -904,7 +997,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleOutOfSequenceInstallSnapshot() throws Exception { + public void testHandleOutOfSequenceInstallSnapshot() { logStart("testHandleOutOfSequenceInstallSnapshot"); MockRaftActorContext context = createActorContext(); @@ -929,28 +1022,27 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() { MockRaftActorContext context = createActorContext(); Stopwatch stopwatch = Stopwatch.createStarted(); follower = createBehavior(context); - ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor, - ElectionTimeout.class); + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); - RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); assertTrue("Expected Candidate", newBehavior instanceof Candidate); } @Test - public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){ + public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() { MockRaftActorContext context = createActorContext(); - context.setConfigParams(new DefaultConfigParamsImpl(){ + context.setConfigParams(new DefaultConfigParamsImpl() { @Override public FiniteDuration getElectionTimeOutInterval() { return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); @@ -961,14 +1053,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor, - ElectionTimeout.class); - RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout); + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); assertSame("handleMessage result", follower, newBehavior); } @Test - public void testFollowerSchedulesElectionIfNonVoting(){ + public void testFollowerSchedulesElectionIfNonVoting() { MockRaftActorContext context = createActorContext(); context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false)))); ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval( @@ -985,7 +1076,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testElectionScheduledWhenAnyRaftRPCReceived(){ + public void testElectionScheduledWhenAnyRaftRPCReceived() { MockRaftActorContext context = createActorContext(); follower = createBehavior(context); follower.handleMessage(leaderActor, new RaftRPC() { @@ -1000,21 +1091,238 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){ + public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() { MockRaftActorContext context = createActorContext(); follower = createBehavior(context); follower.handleMessage(leaderActor, "non-raft-rpc"); verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } - public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){ + @Test + public void testCaptureSnapshotOnLastEntryInAppendEntries() { + String id = "testCaptureSnapshotOnLastEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 1 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()), + MockRaftActor.fromState(snapshot.getState())); + } + + @Test + public void testCaptureSnapshotOnMiddleEntryInAppendEntries() { + String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 2 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState())); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex()); + + // Reinstate the actor from persistence + + actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem())); + + followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex()); + assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied()); + assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex()); + assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), followerRaftActor.get().getState()); + } + + @Test + public void testCaptureSnapshotOnAppendEntriesWithUnapplied() { + String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(1); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + InMemoryJournal.waitForWriteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 0 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()), + MockRaftActor.fromState(snapshot.getState())); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private static RaftActorSnapshotCohort newRaftActorSnapshotCohort( + final AtomicReference followerRaftActor) { + RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { + @Override + public void createSnapshot(final ActorRef actorRef, + final java.util.Optional installSnapshotStream) { + try { + actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()), + installSnapshotStream), actorRef); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void applySnapshot(final State snapshotState) { + } + + @Override + public State deserializeSnapshot(final ByteSource snapshotBytes) { + throw new UnsupportedOperationException(); + } + }; + return snapshotCohort; + } + + public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) { int snapshotLength = bs.size(); int start = offset; int size = chunkSize; if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + chunkSize) > snapshotLength) { + if (start + chunkSize > snapshotLength) { size = snapshotLength - start; } } @@ -1024,14 +1332,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { return nextChunk; } - private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, - String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess, + final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) { expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); } - private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, - String expFollowerId, long expLogLastTerm, long expLogLastIndex, - boolean expForceInstallSnapshot) { + private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess, + final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex, + final boolean expForceInstallSnapshot) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); @@ -1046,12 +1354,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } - private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { - return new MockRaftActorContext.MockReplicatedLogEntry(term, index, + private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) { + return new SimpleReplicatedLogEntry(index, term, new MockRaftActorContext.MockPayload(data)); } - private ByteString createSnapshot(){ + private ByteString createSnapshot() { HashMap followerSnapshot = new HashMap<>(); followerSnapshot.put("1", "A"); followerSnapshot.put("2", "B"); @@ -1061,8 +1369,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null; @@ -1070,8 +1378,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) - throws Exception { + protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); assertEquals("isSuccess", true, reply.isSuccess()); }