X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=b01fd33914f9500c8537cc86032aee924b54f7ed;hp=34cdd5b67ef3e31af457b6ab554bf3e72275a2f3;hb=95d7b8820236d16cb7e37c4a95fcae6f6d55581e;hpb=471ac7d59e93db65fd49733d5185bc1996f7ad43 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 34cdd5b67e..b01fd33914 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,36 +1,60 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; -import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; -public class FollowerTest extends AbstractRaftActorBehaviorTest { - - private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); +public class FollowerTest extends AbstractRaftActorBehaviorTest { private final TestActorRef followerActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); @@ -38,20 +62,23 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); - private RaftActorBehavior follower; + private Follower follower; + + private final short payloadVersion = 5; + @Override @After public void tearDown() throws Exception { if(follower != null) { follower.close(); } - actorFactory.close(); + super.tearDown(); } @Override - protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Follower(actorContext); + protected Follower createBehavior(RaftActorContext actorContext) { + return spy(new Follower(actorContext)); } @Override @@ -61,7 +88,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef){ - return new MockRaftActorContext("follower", getSystem(), actorRef); + MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); + context.setPayloadVersion(payloadVersion ); + return context; } @Test @@ -69,26 +98,60 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext actorContext = createActorContext(); follower = new Follower(actorContext); - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class, + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class, actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis()); } @Test - public void testHandleElectionTimeout(){ - logStart("testHandleElectionTimeout"); + public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived"); - follower = new Follower(createActorContext()); + MockRaftActorContext context = createActorContext(); + follower = new Follower(context); - RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout()); + Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(), + TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); assertTrue(raftBehavior instanceof Candidate); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + public void testHandleElectionTimeoutWhenLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenLeaderMessageReceived"); + + MockRaftActorContext context = createActorContext(); + ((DefaultConfigParamsImpl) context.getConfigParams()). + setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4); + + follower = new Follower(context); + context.setCurrentBehavior(follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams(). + getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams(). + getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() { logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); long term = 1000; context.getTermInformation().update(term, null); @@ -100,13 +163,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("isVoteGranted", true, reply.isVoteGranted()); assertEquals("getTerm", term, reply.getTerm()); + verify(follower).scheduleElection(any(FiniteDuration.class)); } @Test public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); long term = 1000; context.getTermInformation().update(term, "test"); @@ -117,8 +181,283 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); assertEquals("isVoteGranted", false, reply.isVoteGranted()); + verify(follower, never()).scheduleElection(any(FiniteDuration.class)); + } + + + @Test + public void testHandleFirstAppendEntries() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + Assert.assertEquals(1, context.getReplicatedLog().size()); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 105, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleSyncUpAppendEntries() throws Exception { + logStart("testHandleSyncUpAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + followerActor.underlyingActor().clear(); + + // Sending the same message again should not generate another message + + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertNull(syncStatus); + + } + + @Test + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + } + + @Test + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the @@ -142,7 +481,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -168,7 +507,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1); + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); follower = createBehavior(context); @@ -216,7 +555,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); + short leaderPayloadVersion = 10; + String leaderId = "leader-1"; + AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion); follower = createBehavior(context); @@ -228,6 +569,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Entry 3", entries.get(0), log.get(3)); assertEquals("Entry 4", entries.get(1), log.get(4)); + assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion()); + assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId()); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } @@ -263,7 +607,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1); + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); follower = createBehavior(context); @@ -288,6 +632,44 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3); } + @Test + public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() { + logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); + + context.setRaftPolicy(createRaftPolicy(false, true)); + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true); + } + @Test public void testHandleAppendEntriesPreviousLogEntryMissing(){ logStart("testHandleAppendEntriesPreviousLogEntryMissing"); @@ -306,7 +688,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { List entries = new ArrayList<>(); entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0); follower = createBehavior(context); @@ -337,7 +719,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1)); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0)); assertEquals("Next index", 2, log.last().getIndex() + 1); assertEquals("Entry 1", entries.get(0), log.get(1)); @@ -349,7 +731,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); leaderActor.underlyingActor().clear(); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1)); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0)); assertEquals("Next index", 3, log.last().getIndex() + 1); assertEquals("Entry 1", entries.get(0), log.get(1)); @@ -359,7 +741,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleAppendAfterInstallingSnapshot(){ + public void testHandleAppendEntriesAfterInstallingSnapshot(){ logStart("testHandleAppendAfterInstallingSnapshot"); MockRaftActorContext context = createActorContext(); @@ -377,7 +759,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { List entries = new ArrayList<>(); entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0); follower = createBehavior(context); @@ -400,25 +782,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { logStart("testHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); + context.getTermInformation().update(1, "leader"); follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; for(int i = 0; i < totalChunks; i++) { - ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, chunkData, chunkIndex, totalChunks); follower.handleMessage(leaderActor, lastInstallSnapshot); @@ -430,6 +808,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor, ApplySnapshot.class); Snapshot snapshot = applySnapshot.getSnapshot(); + assertNotNull(lastInstallSnapshot); assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex()); assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastAppliedTerm()); @@ -437,6 +816,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); + applySnapshot.getCallback().onSuccess(); List replies = MessageCollectorActor.getAllMatching( leaderActor, InstallSnapshotReply.class); @@ -450,7 +832,153 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); } - Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker()); + assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); + } + + + /** + * Verify that when an AppendEntries is sent to a follower during a snapshot install + * the Follower short-circuits the processing of the AppendEntries message. + * + * @throws Exception + */ + @Test + public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send an append entry + AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + assertNotNull(follower.getSnapshotTracker()); + } + + @Test + public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send appendEntries with a new term and leader. + AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", 2, reply.getLogLastIndex()); + assertEquals("getLogLastTerm", 2, reply.getLogLastTerm()); + assertEquals("getTerm", 2, reply.getTerm()); + + assertNull(follower.getSnapshotTracker()); + } + + @Test + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + logStart("testInitialSyncUpWithHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + context.setCommitIndex(-1); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } + + FollowerInitialSyncUpStatus syncStatus = + MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); } @Test @@ -461,12 +989,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, getNextChunk(bsSnapshot, 10, 50), 3, 3); @@ -480,25 +1003,111 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getTerm", 1, reply.getTerm()); assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); - Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker()); + assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); + } + + @Test + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + follower = createBehavior(context); + + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); + assertTrue("Expected Candidate", newBehavior instanceof Candidate); } - public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ + @Test + public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){ + MockRaftActorContext context = createActorContext(); + context.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public FiniteDuration getElectionTimeOutInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + context.setRaftPolicy(createRaftPolicy(false, false)); + + follower = createBehavior(context); + + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); + assertSame("handleMessage result", follower, newBehavior); + } + + @Test + public void testFollowerSchedulesElectionIfNonVoting(){ + MockRaftActorContext context = createActorContext(); + context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false)))); + ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval( + FiniteDuration.apply(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1); + + follower = new Follower(context, "leader", (short)1); + + ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor, + ElectionTimeout.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout); + assertSame("handleMessage result", follower, newBehavior); + assertNull("Expected null leaderId", follower.getLeaderId()); + } + + @Test + public void testElectionScheduledWhenAnyRaftRPCReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, new RaftRPC() { + private static final long serialVersionUID = 1L; + + @Override + public long getTerm() { + return 100; + } + }); + verify(follower).scheduleElection(any(FiniteDuration.class)); + } + + @Test + public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, "non-raft-rpc"); + verify(follower, never()).scheduleElection(any(FiniteDuration.class)); + } + + public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; int size = chunkSize; if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + chunkSize) > snapshotLength) { + if (start + chunkSize > snapshotLength) { size = snapshotLength - start; } } - return bs.substring(start, start + size); + + byte[] nextChunk = new byte[size]; + bs.copyTo(nextChunk, start, 0, size); + return nextChunk; } private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); + } + + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex, + boolean expForceInstallSnapshot) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); @@ -508,10 +1117,38 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getFollowerId", expFollowerId, reply.getFollowerId()); assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); + assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); + assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); } - private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + + private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { return new MockRaftActorContext.MockReplicatedLogEntry(term, index, new MockRaftActorContext.MockPayload(data)); } + + private ByteString createSnapshot(){ + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + return toByteString(followerSnapshot); + } + + @Override + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, + ActorRef actorRef, RaftRPC rpc) throws Exception { + super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); + + String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null; + assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor()); + } + + @Override + protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) + throws Exception { + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + } }