+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
-public class FollowerTest extends AbstractRaftActorBehaviorTest {
+public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
- private RaftActorBehavior follower;
+ private Follower follower;
+
+ private final short payloadVersion = 5;
@Override
@After
}
@Override
- protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
- return new Follower(actorContext);
+ protected Follower createBehavior(RaftActorContext actorContext) {
+ return spy(new Follower(actorContext));
}
@Override
@Override
protected MockRaftActorContext createActorContext(ActorRef actorRef){
- return new MockRaftActorContext("follower", getSystem(), actorRef);
+ MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+ context.setPayloadVersion(payloadVersion );
+ return context;
}
@Test
follower = new Follower(createActorContext());
- RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
+ RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
assertTrue(raftBehavior instanceof Candidate);
}
public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
- RaftActorContext context = createActorContext();
+ MockRaftActorContext context = createActorContext();
long term = 1000;
context.getTermInformation().update(term, null);
assertEquals("isVoteGranted", true, reply.isVoteGranted());
assertEquals("getTerm", term, reply.getTerm());
+ verify(follower).scheduleElection(any(FiniteDuration.class));
}
@Test
public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
- RaftActorContext context = createActorContext();
+ MockRaftActorContext context = createActorContext();
long term = 1000;
context.getTermInformation().update(term, "test");
RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
assertEquals("isVoteGranted", false, reply.isVoteGranted());
+ verify(follower, never()).scheduleElection(any(FiniteDuration.class));
}
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
+ context.getReplicatedLog().clear(0,2);
+ context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
+ context.getReplicatedLog().setSnapshotIndex(99);
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(2, 101, "foo"));
+
+ Assert.assertEquals(1, context.getReplicatedLog().size());
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, appendEntries);
+
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertFalse(syncStatus.isInitialSyncDone());
+ assertTrue("append entries reply should be true", reply.isSuccess());
+ }
+
+ @Test
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+ logStart("testHandleFirstAppendEntries");
+
+ MockRaftActorContext context = createActorContext();
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(2, 101, "foo"));
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, appendEntries);
+
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertFalse(syncStatus.isInitialSyncDone());
+ assertFalse("append entries reply should be false", reply.isSuccess());
+ }
+
+ @Test
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
+ logStart("testHandleFirstAppendEntries");
+
+ MockRaftActorContext context = createActorContext();
+ context.getReplicatedLog().clear(0,2);
+ context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
+ context.getReplicatedLog().setSnapshotIndex(99);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertFalse(syncStatus.isInitialSyncDone());
+ assertTrue("append entries reply should be true", reply.isSuccess());
+ }
+
+ @Test
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
+ logStart("testHandleFirstAppendEntries");
+
+ MockRaftActorContext context = createActorContext();
+ context.getReplicatedLog().clear(0,2);
+ context.getReplicatedLog().setSnapshotIndex(100);
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(2, 101, "foo"));
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, appendEntries);
+
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertFalse(syncStatus.isInitialSyncDone());
+ assertTrue("append entries reply should be true", reply.isSuccess());
+ }
+
+ @Test
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
+ logStart("testHandleFirstAppendEntries");
+
+ MockRaftActorContext context = createActorContext();
+ context.getReplicatedLog().clear(0,2);
+ context.getReplicatedLog().setSnapshotIndex(100);
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(2, 105, "foo"));
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
+
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, appendEntries);
+
+ FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertFalse(syncStatus.isInitialSyncDone());
+ assertFalse("append entries reply should be false", reply.isSuccess());
}
@Test
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+ appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
- appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+ appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+ appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
- appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+ appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
follower = createBehavior(context);
follower.handleMessage(leaderActor, appendEntries);
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
+ AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
follower = createBehavior(context);
// before the new behavior was created (1 in this case)
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
- AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+ short leaderPayloadVersion = 10;
+ String leaderId = "leader-1";
+ AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
follower = createBehavior(context);
assertEquals("Entry 3", entries.get(0), log.get(3));
assertEquals("Entry 4", entries.get(1), log.get(4));
+ assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
+ assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
+
expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
}
// before the new behavior was created (1 in this case)
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
- AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
follower = createBehavior(context);
expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
}
+ @Test
+ public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
+ logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(1, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(newReplicatedLogEntry(1, 0, "zero"));
+ log.append(newReplicatedLogEntry(1, 1, "one"));
+ log.append(newReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+ entries.add(newReplicatedLogEntry(2, 3, "three"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
+
+ context.setRaftPolicy(createRaftPolicy(false, true));
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ Assert.assertSame(follower, newBehavior);
+
+ expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
+ }
+
@Test
public void testHandleAppendEntriesPreviousLogEntryMissing(){
logStart("testHandleAppendEntriesPreviousLogEntryMissing");
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(newReplicatedLogEntry(1, 4, "four"));
- AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
follower = createBehavior(context);
follower = createBehavior(context);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
assertEquals("Next index", 2, log.last().getIndex() + 1);
assertEquals("Entry 1", entries.get(0), log.get(1));
entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
leaderActor.underlyingActor().clear();
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
assertEquals("Next index", 3, log.last().getIndex() + 1);
assertEquals("Entry 1", entries.get(0), log.get(1));
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(newReplicatedLogEntry(1, 4, "four"));
- AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
follower = createBehavior(context);
logStart("testHandleInstallSnapshot");
MockRaftActorContext context = createActorContext();
+ context.getTermInformation().update(1, "leader");
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
InstallSnapshot lastInstallSnapshot = null;
for(int i = 0; i < totalChunks; i++) {
- ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
follower.handleMessage(leaderActor, lastInstallSnapshot);
ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
ApplySnapshot.class);
Snapshot snapshot = applySnapshot.getSnapshot();
+ assertNotNull(lastInstallSnapshot);
assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
snapshot.getLastAppliedTerm());
snapshot.getLastAppliedIndex());
assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+ assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+ assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+ applySnapshot.getCallback().onSuccess();
List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
leaderActor, InstallSnapshotReply.class);
assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
}
- assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+ assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+ }
+
+
+ /**
+ * Verify that when an AppendEntries is sent to a follower during a snapshot install
+ * the Follower short-circuits the processing of the AppendEntries message.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(follower.getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(follower.getSnapshotTracker());
+
+ // Send an append entry
+ AppendEntries appendEntries = mock(AppendEntries.class);
+ doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
+ verify(appendEntries, never()).getPrevLogIndex();
+
}
@Test
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
InstallSnapshot lastInstallSnapshot = null;
for(int i = 0; i < totalChunks; i++) {
- ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
follower.handleMessage(leaderActor, lastInstallSnapshot);
newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
- AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
follower.handleMessage(leaderActor, appendEntries);
syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
getNextChunk(bsSnapshot, 10, 50), 3, 3);
assertEquals("getTerm", 1, reply.getTerm());
assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
- assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+ assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ follower = createBehavior(context);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+ }
+
+ @Test
+ public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
+ MockRaftActorContext context = createActorContext();
+ context.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ context.setRaftPolicy(createRaftPolicy(false, false));
+
+ follower = createBehavior(context);
+
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
}
- public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
+ @Test
+ public void testElectionScheduledWhenAnyRaftRPCReceived(){
+ MockRaftActorContext context = createActorContext();
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, new RaftRPC() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTerm() {
+ return 100;
+ }
+ });
+ verify(follower).scheduleElection(any(FiniteDuration.class));
+ }
+
+ @Test
+ public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
+ MockRaftActorContext context = createActorContext();
+ follower = createBehavior(context);
+ follower.handleMessage(leaderActor, "non-raft-rpc");
+ verify(follower, never()).scheduleElection(any(FiniteDuration.class));
+ }
+
+ public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
int size = chunkSize;
size = snapshotLength - start;
}
}
- return bs.substring(start, start + size);
+
+ byte[] nextChunk = new byte[size];
+ bs.copyTo(nextChunk, start, 0, size);
+ return nextChunk;
}
private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+ expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+ }
+
+ private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+ String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+ boolean expForceInstallSnapshot) {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
AppendEntriesReply.class);
assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+ assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+ assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
}
- private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+
+ private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
new MockRaftActorContext.MockPayload(data));
}
+ private ByteString createSnapshot(){
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ return toByteString(followerSnapshot);
+ }
+
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
- String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
+ String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
}
@Override
- protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+ protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
throws Exception {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
assertEquals("isSuccess", true, reply.isSuccess());