1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.Snapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
31 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 public class FollowerTest extends AbstractRaftActorBehaviorTest {
43 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
44 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
46 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
47 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
49 private RaftActorBehavior follower;
53 public void tearDown() throws Exception {
54 if(follower != null) {
62 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
63 return new Follower(actorContext);
67 protected MockRaftActorContext createActorContext() {
68 return createActorContext(followerActor);
72 protected MockRaftActorContext createActorContext(ActorRef actorRef){
73 return new MockRaftActorContext("follower", getSystem(), actorRef);
77 public void testThatAnElectionTimeoutIsTriggered(){
78 MockRaftActorContext actorContext = createActorContext();
79 follower = new Follower(actorContext);
81 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
82 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
86 public void testHandleElectionTimeout(){
87 logStart("testHandleElectionTimeout");
89 follower = new Follower(createActorContext());
91 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
93 assertTrue(raftBehavior instanceof Candidate);
97 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
98 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
100 RaftActorContext context = createActorContext();
102 context.getTermInformation().update(term, null);
104 follower = createBehavior(context);
106 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
108 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
110 assertEquals("isVoteGranted", true, reply.isVoteGranted());
111 assertEquals("getTerm", term, reply.getTerm());
115 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
116 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
118 RaftActorContext context = createActorContext();
120 context.getTermInformation().update(term, "test");
122 follower = createBehavior(context);
124 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
126 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
128 assertEquals("isVoteGranted", false, reply.isVoteGranted());
133 public void testHandleFirstAppendEntries() throws Exception {
134 logStart("testHandleFirstAppendEntries");
136 MockRaftActorContext context = createActorContext();
138 List<ReplicatedLogEntry> entries = Arrays.asList(
139 newReplicatedLogEntry(2, 101, "foo"));
141 // The new commitIndex is 101
142 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
144 follower = createBehavior(context);
145 follower.handleMessage(leaderActor, appendEntries);
147 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
149 assertFalse(syncStatus.isInitialSyncDone());
153 public void testHandleSyncUpAppendEntries() throws Exception {
154 logStart("testHandleSyncUpAppendEntries");
156 MockRaftActorContext context = createActorContext();
158 List<ReplicatedLogEntry> entries = Arrays.asList(
159 newReplicatedLogEntry(2, 101, "foo"));
161 // The new commitIndex is 101
162 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
164 follower = createBehavior(context);
165 follower.handleMessage(leaderActor, appendEntries);
167 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
169 assertFalse(syncStatus.isInitialSyncDone());
171 // Clear all the messages
172 followerActor.underlyingActor().clear();
174 context.setLastApplied(101);
175 context.setCommitIndex(101);
176 setLastLogEntry(context, 1, 101,
177 new MockRaftActorContext.MockPayload(""));
179 entries = Arrays.asList(
180 newReplicatedLogEntry(2, 101, "foo"));
182 // The new commitIndex is 101
183 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
184 follower.handleMessage(leaderActor, appendEntries);
186 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
188 assertTrue(syncStatus.isInitialSyncDone());
190 followerActor.underlyingActor().clear();
192 // Sending the same message again should not generate another message
194 follower.handleMessage(leaderActor, appendEntries);
196 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
198 assertNull(syncStatus);
203 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
204 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
206 MockRaftActorContext context = createActorContext();
208 List<ReplicatedLogEntry> entries = Arrays.asList(
209 newReplicatedLogEntry(2, 101, "foo"));
211 // The new commitIndex is 101
212 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
214 follower = createBehavior(context);
215 follower.handleMessage(leaderActor, appendEntries);
217 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
219 assertFalse(syncStatus.isInitialSyncDone());
221 // Clear all the messages
222 followerActor.underlyingActor().clear();
224 context.setLastApplied(100);
225 setLastLogEntry(context, 1, 100,
226 new MockRaftActorContext.MockPayload(""));
228 entries = Arrays.asList(
229 newReplicatedLogEntry(2, 101, "foo"));
231 // leader-2 is becoming the leader now and it says the commitIndex is 45
232 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
233 follower.handleMessage(leaderActor, appendEntries);
235 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
237 // We get a new message saying initial status is not done
238 assertFalse(syncStatus.isInitialSyncDone());
244 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
245 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
247 MockRaftActorContext context = createActorContext();
249 List<ReplicatedLogEntry> entries = Arrays.asList(
250 newReplicatedLogEntry(2, 101, "foo"));
252 // The new commitIndex is 101
253 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
255 follower = createBehavior(context);
256 follower.handleMessage(leaderActor, appendEntries);
258 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
260 assertFalse(syncStatus.isInitialSyncDone());
262 // Clear all the messages
263 followerActor.underlyingActor().clear();
265 context.setLastApplied(101);
266 context.setCommitIndex(101);
267 setLastLogEntry(context, 1, 101,
268 new MockRaftActorContext.MockPayload(""));
270 entries = Arrays.asList(
271 newReplicatedLogEntry(2, 101, "foo"));
273 // The new commitIndex is 101
274 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
275 follower.handleMessage(leaderActor, appendEntries);
277 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
279 assertTrue(syncStatus.isInitialSyncDone());
281 // Clear all the messages
282 followerActor.underlyingActor().clear();
284 context.setLastApplied(100);
285 setLastLogEntry(context, 1, 100,
286 new MockRaftActorContext.MockPayload(""));
288 entries = Arrays.asList(
289 newReplicatedLogEntry(2, 101, "foo"));
291 // leader-2 is becoming the leader now and it says the commitIndex is 45
292 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
293 follower.handleMessage(leaderActor, appendEntries);
295 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
297 // We get a new message saying initial status is not done
298 assertFalse(syncStatus.isInitialSyncDone());
304 * This test verifies that when an AppendEntries RPC is received by a RaftActor
305 * with a commitIndex that is greater than what has been applied to the
306 * state machine of the RaftActor, the RaftActor applies the state and
307 * sets it current applied state to the commitIndex of the sender.
312 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
313 logStart("testHandleAppendEntriesWithNewerCommitIndex");
315 MockRaftActorContext context = createActorContext();
317 context.setLastApplied(100);
318 setLastLogEntry(context, 1, 100,
319 new MockRaftActorContext.MockPayload(""));
320 context.getReplicatedLog().setSnapshotIndex(99);
322 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
323 newReplicatedLogEntry(2, 101, "foo"));
325 // The new commitIndex is 101
326 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
328 follower = createBehavior(context);
329 follower.handleMessage(leaderActor, appendEntries);
331 assertEquals("getLastApplied", 101L, context.getLastApplied());
335 * This test verifies that when an AppendEntries is received a specific prevLogTerm
336 * which does not match the term that is in RaftActors log entry at prevLogIndex
337 * then the RaftActor does not change it's state and it returns a failure.
342 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
343 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
345 MockRaftActorContext context = createActorContext();
347 // First set the receivers term to lower number
348 context.getTermInformation().update(95, "test");
350 // AppendEntries is now sent with a bigger term
351 // this will set the receivers term to be the same as the sender's term
352 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
354 follower = createBehavior(context);
356 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
358 Assert.assertSame(follower, newBehavior);
360 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
361 AppendEntriesReply.class);
363 assertEquals("isSuccess", false, reply.isSuccess());
367 * This test verifies that when a new AppendEntries message is received with
368 * new entries and the logs of the sender and receiver match that the new
369 * entries get added to the log and the log is incremented by the number of
370 * entries received in appendEntries
375 public void testHandleAppendEntriesAddNewEntries() {
376 logStart("testHandleAppendEntriesAddNewEntries");
378 MockRaftActorContext context = createActorContext();
380 // First set the receivers term to lower number
381 context.getTermInformation().update(1, "test");
383 // Prepare the receivers log
384 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
385 log.append(newReplicatedLogEntry(1, 0, "zero"));
386 log.append(newReplicatedLogEntry(1, 1, "one"));
387 log.append(newReplicatedLogEntry(1, 2, "two"));
389 context.setReplicatedLog(log);
391 // Prepare the entries to be sent with AppendEntries
392 List<ReplicatedLogEntry> entries = new ArrayList<>();
393 entries.add(newReplicatedLogEntry(1, 3, "three"));
394 entries.add(newReplicatedLogEntry(1, 4, "four"));
396 // Send appendEntries with the same term as was set on the receiver
397 // before the new behavior was created (1 in this case)
398 // This will not work for a Candidate because as soon as a Candidate
399 // is created it increments the term
400 AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
402 follower = createBehavior(context);
404 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
406 Assert.assertSame(follower, newBehavior);
408 assertEquals("Next index", 5, log.last().getIndex() + 1);
409 assertEquals("Entry 3", entries.get(0), log.get(3));
410 assertEquals("Entry 4", entries.get(1), log.get(4));
412 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
416 * This test verifies that when a new AppendEntries message is received with
417 * new entries and the logs of the sender and receiver are out-of-sync that
418 * the log is first corrected by removing the out of sync entries from the
419 * log and then adding in the new entries sent with the AppendEntries message
422 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
423 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
425 MockRaftActorContext context = createActorContext();
427 // First set the receivers term to lower number
428 context.getTermInformation().update(1, "test");
430 // Prepare the receivers log
431 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
432 log.append(newReplicatedLogEntry(1, 0, "zero"));
433 log.append(newReplicatedLogEntry(1, 1, "one"));
434 log.append(newReplicatedLogEntry(1, 2, "two"));
436 context.setReplicatedLog(log);
438 // Prepare the entries to be sent with AppendEntries
439 List<ReplicatedLogEntry> entries = new ArrayList<>();
440 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
441 entries.add(newReplicatedLogEntry(2, 3, "three"));
443 // Send appendEntries with the same term as was set on the receiver
444 // before the new behavior was created (1 in this case)
445 // This will not work for a Candidate because as soon as a Candidate
446 // is created it increments the term
447 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
449 follower = createBehavior(context);
451 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
453 Assert.assertSame(follower, newBehavior);
455 // The entry at index 2 will be found out-of-sync with the leader
456 // and will be removed
457 // Then the two new entries will be added to the log
458 // Thus making the log to have 4 entries
459 assertEquals("Next index", 4, log.last().getIndex() + 1);
460 //assertEquals("Entry 2", entries.get(0), log.get(2));
462 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
464 // Check that the entry at index 2 has the new data
465 assertEquals("Entry 2", entries.get(0), log.get(2));
467 assertEquals("Entry 3", entries.get(1), log.get(3));
469 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
473 public void testHandleAppendEntriesPreviousLogEntryMissing(){
474 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
476 MockRaftActorContext context = createActorContext();
478 // Prepare the receivers log
479 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
480 log.append(newReplicatedLogEntry(1, 0, "zero"));
481 log.append(newReplicatedLogEntry(1, 1, "one"));
482 log.append(newReplicatedLogEntry(1, 2, "two"));
484 context.setReplicatedLog(log);
486 // Prepare the entries to be sent with AppendEntries
487 List<ReplicatedLogEntry> entries = new ArrayList<>();
488 entries.add(newReplicatedLogEntry(1, 4, "four"));
490 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
492 follower = createBehavior(context);
494 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
496 Assert.assertSame(follower, newBehavior);
498 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
502 public void testHandleAppendEntriesWithExistingLogEntry() {
503 logStart("testHandleAppendEntriesWithExistingLogEntry");
505 MockRaftActorContext context = createActorContext();
507 context.getTermInformation().update(1, "test");
509 // Prepare the receivers log
510 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
511 log.append(newReplicatedLogEntry(1, 0, "zero"));
512 log.append(newReplicatedLogEntry(1, 1, "one"));
514 context.setReplicatedLog(log);
516 // Send the last entry again.
517 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
519 follower = createBehavior(context);
521 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
523 assertEquals("Next index", 2, log.last().getIndex() + 1);
524 assertEquals("Entry 1", entries.get(0), log.get(1));
526 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
528 // Send the last entry again and also a new one.
530 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
532 leaderActor.underlyingActor().clear();
533 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
535 assertEquals("Next index", 3, log.last().getIndex() + 1);
536 assertEquals("Entry 1", entries.get(0), log.get(1));
537 assertEquals("Entry 2", entries.get(1), log.get(2));
539 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
543 public void testHandleAppendEntriesAfterInstallingSnapshot(){
544 logStart("testHandleAppendAfterInstallingSnapshot");
546 MockRaftActorContext context = createActorContext();
548 // Prepare the receivers log
549 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
551 // Set up a log as if it has been snapshotted
552 log.setSnapshotIndex(3);
553 log.setSnapshotTerm(1);
555 context.setReplicatedLog(log);
557 // Prepare the entries to be sent with AppendEntries
558 List<ReplicatedLogEntry> entries = new ArrayList<>();
559 entries.add(newReplicatedLogEntry(1, 4, "four"));
561 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
563 follower = createBehavior(context);
565 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
567 Assert.assertSame(follower, newBehavior);
569 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
574 * This test verifies that when InstallSnapshot is received by
575 * the follower its applied correctly.
580 public void testHandleInstallSnapshot() throws Exception {
581 logStart("testHandleInstallSnapshot");
583 MockRaftActorContext context = createActorContext();
585 follower = createBehavior(context);
587 ByteString bsSnapshot = createSnapshot();
589 int snapshotLength = bsSnapshot.size();
591 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
592 int lastIncludedIndex = 1;
594 InstallSnapshot lastInstallSnapshot = null;
596 for(int i = 0; i < totalChunks; i++) {
597 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
598 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
599 chunkData, chunkIndex, totalChunks);
600 follower.handleMessage(leaderActor, lastInstallSnapshot);
601 offset = offset + 50;
606 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
607 ApplySnapshot.class);
608 Snapshot snapshot = applySnapshot.getSnapshot();
609 assertNotNull(lastInstallSnapshot);
610 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
611 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
612 snapshot.getLastAppliedTerm());
613 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
614 snapshot.getLastAppliedIndex());
615 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
616 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
618 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
619 leaderActor, InstallSnapshotReply.class);
620 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
623 for(InstallSnapshotReply reply: replies) {
624 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
625 assertEquals("getTerm", 1, reply.getTerm());
626 assertEquals("isSuccess", true, reply.isSuccess());
627 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
630 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
635 * Verify that when an AppendEntries is sent to a follower during a snapshot install
636 * the Follower short-circuits the processing of the AppendEntries message.
641 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
642 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
644 MockRaftActorContext context = createActorContext();
646 follower = createBehavior(context);
648 ByteString bsSnapshot = createSnapshot();
649 int snapshotLength = bsSnapshot.size();
651 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
652 int lastIncludedIndex = 1;
654 // Check that snapshot installation is not in progress
655 assertNull(((Follower) follower).getSnapshotTracker());
657 // Make sure that we have more than 1 chunk to send
658 assertTrue(totalChunks > 1);
660 // Send an install snapshot with the first chunk to start the process of installing a snapshot
661 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
662 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
663 chunkData, 1, totalChunks));
665 // Check if snapshot installation is in progress now
666 assertNotNull(((Follower) follower).getSnapshotTracker());
668 // Send an append entry
669 AppendEntries appendEntries = mock(AppendEntries.class);
670 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
672 follower.handleMessage(leaderActor, appendEntries);
674 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
675 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
676 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
677 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
679 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
680 verify(appendEntries, never()).getPrevLogIndex();
685 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
686 logStart("testInitialSyncUpWithHandleInstallSnapshot");
688 MockRaftActorContext context = createActorContext();
690 follower = createBehavior(context);
692 ByteString bsSnapshot = createSnapshot();
694 int snapshotLength = bsSnapshot.size();
696 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
697 int lastIncludedIndex = 1;
699 InstallSnapshot lastInstallSnapshot = null;
701 for(int i = 0; i < totalChunks; i++) {
702 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
703 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
704 chunkData, chunkIndex, totalChunks);
705 follower.handleMessage(leaderActor, lastInstallSnapshot);
706 offset = offset + 50;
711 FollowerInitialSyncUpStatus syncStatus =
712 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
714 assertFalse(syncStatus.isInitialSyncDone());
716 // Clear all the messages
717 followerActor.underlyingActor().clear();
719 context.setLastApplied(101);
720 context.setCommitIndex(101);
721 setLastLogEntry(context, 1, 101,
722 new MockRaftActorContext.MockPayload(""));
724 List<ReplicatedLogEntry> entries = Arrays.asList(
725 newReplicatedLogEntry(2, 101, "foo"));
727 // The new commitIndex is 101
728 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
729 follower.handleMessage(leaderActor, appendEntries);
731 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
733 assertTrue(syncStatus.isInitialSyncDone());
737 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
738 logStart("testHandleOutOfSequenceInstallSnapshot");
740 MockRaftActorContext context = createActorContext();
742 follower = createBehavior(context);
744 ByteString bsSnapshot = createSnapshot();
746 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
747 getNextChunk(bsSnapshot, 10, 50), 3, 3);
748 follower.handleMessage(leaderActor, installSnapshot);
750 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
751 InstallSnapshotReply.class);
753 assertEquals("isSuccess", false, reply.isSuccess());
754 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
755 assertEquals("getTerm", 1, reply.getTerm());
756 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
758 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
762 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
763 MockRaftActorContext context = createActorContext();
765 Stopwatch stopwatch = Stopwatch.createStarted();
767 follower = createBehavior(context);
769 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
771 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
773 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
776 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
777 int snapshotLength = bs.size();
779 int size = chunkSize;
780 if (chunkSize > snapshotLength) {
781 size = snapshotLength;
783 if ((start + chunkSize) > snapshotLength) {
784 size = snapshotLength - start;
787 return bs.substring(start, start + size);
790 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
791 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
793 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
794 AppendEntriesReply.class);
796 assertEquals("isSuccess", expSuccess, reply.isSuccess());
797 assertEquals("getTerm", expTerm, reply.getTerm());
798 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
799 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
800 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
803 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
804 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
805 new MockRaftActorContext.MockPayload(data));
808 private ByteString createSnapshot(){
809 HashMap<String, String> followerSnapshot = new HashMap<>();
810 followerSnapshot.put("1", "A");
811 followerSnapshot.put("2", "B");
812 followerSnapshot.put("3", "C");
814 return toByteString(followerSnapshot);
818 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
819 ActorRef actorRef, RaftRPC rpc) throws Exception {
820 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
822 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
823 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
827 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
829 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
830 assertEquals("isSuccess", true, reply.isSuccess());