1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.protobuf.ByteString;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.HashMap;
19 import java.util.List;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftActorContext;
25 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
26 import org.opendaylight.controller.cluster.raft.Snapshot;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
28 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
29 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
35 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
36 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
37 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
39 public class FollowerTest extends AbstractRaftActorBehaviorTest {
41 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
42 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
44 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
45 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
47 private RaftActorBehavior follower;
51 public void tearDown() throws Exception {
52 if(follower != null) {
60 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
61 return new Follower(actorContext);
65 protected MockRaftActorContext createActorContext() {
66 return createActorContext(followerActor);
70 protected MockRaftActorContext createActorContext(ActorRef actorRef){
71 return new MockRaftActorContext("follower", getSystem(), actorRef);
75 public void testThatAnElectionTimeoutIsTriggered(){
76 MockRaftActorContext actorContext = createActorContext();
77 follower = new Follower(actorContext);
79 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
80 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
84 public void testHandleElectionTimeout(){
85 logStart("testHandleElectionTimeout");
87 follower = new Follower(createActorContext());
89 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
91 assertTrue(raftBehavior instanceof Candidate);
95 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
96 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
98 RaftActorContext context = createActorContext();
100 context.getTermInformation().update(term, null);
102 follower = createBehavior(context);
104 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
106 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
108 assertEquals("isVoteGranted", true, reply.isVoteGranted());
109 assertEquals("getTerm", term, reply.getTerm());
113 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
114 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
116 RaftActorContext context = createActorContext();
118 context.getTermInformation().update(term, "test");
120 follower = createBehavior(context);
122 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
124 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
126 assertEquals("isVoteGranted", false, reply.isVoteGranted());
131 public void testHandleFirstAppendEntries() throws Exception {
132 logStart("testHandleFirstAppendEntries");
134 MockRaftActorContext context = createActorContext();
136 List<ReplicatedLogEntry> entries = Arrays.asList(
137 newReplicatedLogEntry(2, 101, "foo"));
139 // The new commitIndex is 101
140 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
142 follower = createBehavior(context);
143 follower.handleMessage(leaderActor, appendEntries);
145 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
147 assertFalse(syncStatus.isInitialSyncDone());
151 public void testHandleSyncUpAppendEntries() throws Exception {
152 logStart("testHandleSyncUpAppendEntries");
154 MockRaftActorContext context = createActorContext();
156 List<ReplicatedLogEntry> entries = Arrays.asList(
157 newReplicatedLogEntry(2, 101, "foo"));
159 // The new commitIndex is 101
160 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
162 follower = createBehavior(context);
163 follower.handleMessage(leaderActor, appendEntries);
165 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
167 assertFalse(syncStatus.isInitialSyncDone());
169 // Clear all the messages
170 followerActor.underlyingActor().clear();
172 context.setLastApplied(101);
173 context.setCommitIndex(101);
174 setLastLogEntry(context, 1, 101,
175 new MockRaftActorContext.MockPayload(""));
177 entries = Arrays.asList(
178 newReplicatedLogEntry(2, 101, "foo"));
180 // The new commitIndex is 101
181 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
182 follower.handleMessage(leaderActor, appendEntries);
184 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
186 assertTrue(syncStatus.isInitialSyncDone());
188 followerActor.underlyingActor().clear();
190 // Sending the same message again should not generate another message
192 follower.handleMessage(leaderActor, appendEntries);
194 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
196 assertNull(syncStatus);
201 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
202 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
204 MockRaftActorContext context = createActorContext();
206 List<ReplicatedLogEntry> entries = Arrays.asList(
207 newReplicatedLogEntry(2, 101, "foo"));
209 // The new commitIndex is 101
210 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
212 follower = createBehavior(context);
213 follower.handleMessage(leaderActor, appendEntries);
215 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
217 assertFalse(syncStatus.isInitialSyncDone());
219 // Clear all the messages
220 followerActor.underlyingActor().clear();
222 context.setLastApplied(100);
223 setLastLogEntry(context, 1, 100,
224 new MockRaftActorContext.MockPayload(""));
226 entries = Arrays.asList(
227 newReplicatedLogEntry(2, 101, "foo"));
229 // leader-2 is becoming the leader now and it says the commitIndex is 45
230 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
231 follower.handleMessage(leaderActor, appendEntries);
233 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
235 // We get a new message saying initial status is not done
236 assertFalse(syncStatus.isInitialSyncDone());
242 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
243 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
245 MockRaftActorContext context = createActorContext();
247 List<ReplicatedLogEntry> entries = Arrays.asList(
248 newReplicatedLogEntry(2, 101, "foo"));
250 // The new commitIndex is 101
251 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
253 follower = createBehavior(context);
254 follower.handleMessage(leaderActor, appendEntries);
256 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
258 assertFalse(syncStatus.isInitialSyncDone());
260 // Clear all the messages
261 followerActor.underlyingActor().clear();
263 context.setLastApplied(101);
264 context.setCommitIndex(101);
265 setLastLogEntry(context, 1, 101,
266 new MockRaftActorContext.MockPayload(""));
268 entries = Arrays.asList(
269 newReplicatedLogEntry(2, 101, "foo"));
271 // The new commitIndex is 101
272 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
273 follower.handleMessage(leaderActor, appendEntries);
275 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
277 assertTrue(syncStatus.isInitialSyncDone());
279 // Clear all the messages
280 followerActor.underlyingActor().clear();
282 context.setLastApplied(100);
283 setLastLogEntry(context, 1, 100,
284 new MockRaftActorContext.MockPayload(""));
286 entries = Arrays.asList(
287 newReplicatedLogEntry(2, 101, "foo"));
289 // leader-2 is becoming the leader now and it says the commitIndex is 45
290 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
291 follower.handleMessage(leaderActor, appendEntries);
293 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
295 // We get a new message saying initial status is not done
296 assertFalse(syncStatus.isInitialSyncDone());
302 * This test verifies that when an AppendEntries RPC is received by a RaftActor
303 * with a commitIndex that is greater than what has been applied to the
304 * state machine of the RaftActor, the RaftActor applies the state and
305 * sets it current applied state to the commitIndex of the sender.
310 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
311 logStart("testHandleAppendEntriesWithNewerCommitIndex");
313 MockRaftActorContext context = createActorContext();
315 context.setLastApplied(100);
316 setLastLogEntry(context, 1, 100,
317 new MockRaftActorContext.MockPayload(""));
318 context.getReplicatedLog().setSnapshotIndex(99);
320 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
321 newReplicatedLogEntry(2, 101, "foo"));
323 // The new commitIndex is 101
324 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
326 follower = createBehavior(context);
327 follower.handleMessage(leaderActor, appendEntries);
329 assertEquals("getLastApplied", 101L, context.getLastApplied());
333 * This test verifies that when an AppendEntries is received a specific prevLogTerm
334 * which does not match the term that is in RaftActors log entry at prevLogIndex
335 * then the RaftActor does not change it's state and it returns a failure.
340 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
341 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
343 MockRaftActorContext context = createActorContext();
345 // First set the receivers term to lower number
346 context.getTermInformation().update(95, "test");
348 // AppendEntries is now sent with a bigger term
349 // this will set the receivers term to be the same as the sender's term
350 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
352 follower = createBehavior(context);
354 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
356 Assert.assertSame(follower, newBehavior);
358 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
359 AppendEntriesReply.class);
361 assertEquals("isSuccess", false, reply.isSuccess());
365 * This test verifies that when a new AppendEntries message is received with
366 * new entries and the logs of the sender and receiver match that the new
367 * entries get added to the log and the log is incremented by the number of
368 * entries received in appendEntries
373 public void testHandleAppendEntriesAddNewEntries() {
374 logStart("testHandleAppendEntriesAddNewEntries");
376 MockRaftActorContext context = createActorContext();
378 // First set the receivers term to lower number
379 context.getTermInformation().update(1, "test");
381 // Prepare the receivers log
382 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
383 log.append(newReplicatedLogEntry(1, 0, "zero"));
384 log.append(newReplicatedLogEntry(1, 1, "one"));
385 log.append(newReplicatedLogEntry(1, 2, "two"));
387 context.setReplicatedLog(log);
389 // Prepare the entries to be sent with AppendEntries
390 List<ReplicatedLogEntry> entries = new ArrayList<>();
391 entries.add(newReplicatedLogEntry(1, 3, "three"));
392 entries.add(newReplicatedLogEntry(1, 4, "four"));
394 // Send appendEntries with the same term as was set on the receiver
395 // before the new behavior was created (1 in this case)
396 // This will not work for a Candidate because as soon as a Candidate
397 // is created it increments the term
398 AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
400 follower = createBehavior(context);
402 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
404 Assert.assertSame(follower, newBehavior);
406 assertEquals("Next index", 5, log.last().getIndex() + 1);
407 assertEquals("Entry 3", entries.get(0), log.get(3));
408 assertEquals("Entry 4", entries.get(1), log.get(4));
410 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
414 * This test verifies that when a new AppendEntries message is received with
415 * new entries and the logs of the sender and receiver are out-of-sync that
416 * the log is first corrected by removing the out of sync entries from the
417 * log and then adding in the new entries sent with the AppendEntries message
420 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
421 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
423 MockRaftActorContext context = createActorContext();
425 // First set the receivers term to lower number
426 context.getTermInformation().update(1, "test");
428 // Prepare the receivers log
429 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
430 log.append(newReplicatedLogEntry(1, 0, "zero"));
431 log.append(newReplicatedLogEntry(1, 1, "one"));
432 log.append(newReplicatedLogEntry(1, 2, "two"));
434 context.setReplicatedLog(log);
436 // Prepare the entries to be sent with AppendEntries
437 List<ReplicatedLogEntry> entries = new ArrayList<>();
438 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
439 entries.add(newReplicatedLogEntry(2, 3, "three"));
441 // Send appendEntries with the same term as was set on the receiver
442 // before the new behavior was created (1 in this case)
443 // This will not work for a Candidate because as soon as a Candidate
444 // is created it increments the term
445 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
447 follower = createBehavior(context);
449 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
451 Assert.assertSame(follower, newBehavior);
453 // The entry at index 2 will be found out-of-sync with the leader
454 // and will be removed
455 // Then the two new entries will be added to the log
456 // Thus making the log to have 4 entries
457 assertEquals("Next index", 4, log.last().getIndex() + 1);
458 //assertEquals("Entry 2", entries.get(0), log.get(2));
460 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
462 // Check that the entry at index 2 has the new data
463 assertEquals("Entry 2", entries.get(0), log.get(2));
465 assertEquals("Entry 3", entries.get(1), log.get(3));
467 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
471 public void testHandleAppendEntriesPreviousLogEntryMissing(){
472 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
474 MockRaftActorContext context = createActorContext();
476 // Prepare the receivers log
477 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
478 log.append(newReplicatedLogEntry(1, 0, "zero"));
479 log.append(newReplicatedLogEntry(1, 1, "one"));
480 log.append(newReplicatedLogEntry(1, 2, "two"));
482 context.setReplicatedLog(log);
484 // Prepare the entries to be sent with AppendEntries
485 List<ReplicatedLogEntry> entries = new ArrayList<>();
486 entries.add(newReplicatedLogEntry(1, 4, "four"));
488 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
490 follower = createBehavior(context);
492 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
494 Assert.assertSame(follower, newBehavior);
496 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
500 public void testHandleAppendEntriesWithExistingLogEntry() {
501 logStart("testHandleAppendEntriesWithExistingLogEntry");
503 MockRaftActorContext context = createActorContext();
505 context.getTermInformation().update(1, "test");
507 // Prepare the receivers log
508 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
509 log.append(newReplicatedLogEntry(1, 0, "zero"));
510 log.append(newReplicatedLogEntry(1, 1, "one"));
512 context.setReplicatedLog(log);
514 // Send the last entry again.
515 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
517 follower = createBehavior(context);
519 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
521 assertEquals("Next index", 2, log.last().getIndex() + 1);
522 assertEquals("Entry 1", entries.get(0), log.get(1));
524 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
526 // Send the last entry again and also a new one.
528 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
530 leaderActor.underlyingActor().clear();
531 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
533 assertEquals("Next index", 3, log.last().getIndex() + 1);
534 assertEquals("Entry 1", entries.get(0), log.get(1));
535 assertEquals("Entry 2", entries.get(1), log.get(2));
537 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
541 public void testHandleAppendEntriesAfterInstallingSnapshot(){
542 logStart("testHandleAppendAfterInstallingSnapshot");
544 MockRaftActorContext context = createActorContext();
546 // Prepare the receivers log
547 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
549 // Set up a log as if it has been snapshotted
550 log.setSnapshotIndex(3);
551 log.setSnapshotTerm(1);
553 context.setReplicatedLog(log);
555 // Prepare the entries to be sent with AppendEntries
556 List<ReplicatedLogEntry> entries = new ArrayList<>();
557 entries.add(newReplicatedLogEntry(1, 4, "four"));
559 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
561 follower = createBehavior(context);
563 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
565 Assert.assertSame(follower, newBehavior);
567 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
572 * This test verifies that when InstallSnapshot is received by
573 * the follower its applied correctly.
578 public void testHandleInstallSnapshot() throws Exception {
579 logStart("testHandleInstallSnapshot");
581 MockRaftActorContext context = createActorContext();
583 follower = createBehavior(context);
585 ByteString bsSnapshot = createSnapshot();
587 int snapshotLength = bsSnapshot.size();
589 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
590 int lastIncludedIndex = 1;
592 InstallSnapshot lastInstallSnapshot = null;
594 for(int i = 0; i < totalChunks; i++) {
595 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
596 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
597 chunkData, chunkIndex, totalChunks);
598 follower.handleMessage(leaderActor, lastInstallSnapshot);
599 offset = offset + 50;
604 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
605 ApplySnapshot.class);
606 Snapshot snapshot = applySnapshot.getSnapshot();
607 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
608 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
609 snapshot.getLastAppliedTerm());
610 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
611 snapshot.getLastAppliedIndex());
612 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
613 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
615 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
616 leaderActor, InstallSnapshotReply.class);
617 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
620 for(InstallSnapshotReply reply: replies) {
621 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
622 assertEquals("getTerm", 1, reply.getTerm());
623 assertEquals("isSuccess", true, reply.isSuccess());
624 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
627 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
632 * Verify that when an AppendEntries is sent to a follower during a snapshot install
633 * the Follower short-circuits the processing of the AppendEntries message.
638 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
639 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
641 MockRaftActorContext context = createActorContext();
643 follower = createBehavior(context);
645 ByteString bsSnapshot = createSnapshot();
646 int snapshotLength = bsSnapshot.size();
648 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
649 int lastIncludedIndex = 1;
651 // Check that snapshot installation is not in progress
652 assertNull(((Follower) follower).getSnapshotTracker());
654 // Make sure that we have more than 1 chunk to send
655 assertTrue(totalChunks > 1);
657 // Send an install snapshot with the first chunk to start the process of installing a snapshot
658 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
659 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
660 chunkData, 1, totalChunks));
662 // Check if snapshot installation is in progress now
663 assertNotNull(((Follower) follower).getSnapshotTracker());
665 // Send an append entry
666 AppendEntries appendEntries = mock(AppendEntries.class);
667 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
669 follower.handleMessage(leaderActor, appendEntries);
671 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
672 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
673 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
674 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
676 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
677 verify(appendEntries, never()).getPrevLogIndex();
682 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
683 logStart("testInitialSyncUpWithHandleInstallSnapshot");
685 MockRaftActorContext context = createActorContext();
687 follower = createBehavior(context);
689 ByteString bsSnapshot = createSnapshot();
691 int snapshotLength = bsSnapshot.size();
693 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
694 int lastIncludedIndex = 1;
696 InstallSnapshot lastInstallSnapshot = null;
698 for(int i = 0; i < totalChunks; i++) {
699 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
700 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
701 chunkData, chunkIndex, totalChunks);
702 follower.handleMessage(leaderActor, lastInstallSnapshot);
703 offset = offset + 50;
708 FollowerInitialSyncUpStatus syncStatus =
709 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
711 assertFalse(syncStatus.isInitialSyncDone());
713 // Clear all the messages
714 followerActor.underlyingActor().clear();
716 context.setLastApplied(101);
717 context.setCommitIndex(101);
718 setLastLogEntry(context, 1, 101,
719 new MockRaftActorContext.MockPayload(""));
721 List<ReplicatedLogEntry> entries = Arrays.asList(
722 newReplicatedLogEntry(2, 101, "foo"));
724 // The new commitIndex is 101
725 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
726 follower.handleMessage(leaderActor, appendEntries);
728 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
730 assertTrue(syncStatus.isInitialSyncDone());
734 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
735 logStart("testHandleOutOfSequenceInstallSnapshot");
737 MockRaftActorContext context = createActorContext();
739 follower = createBehavior(context);
741 ByteString bsSnapshot = createSnapshot();
743 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
744 getNextChunk(bsSnapshot, 10, 50), 3, 3);
745 follower.handleMessage(leaderActor, installSnapshot);
747 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
748 InstallSnapshotReply.class);
750 assertEquals("isSuccess", false, reply.isSuccess());
751 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
752 assertEquals("getTerm", 1, reply.getTerm());
753 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
755 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
758 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
759 int snapshotLength = bs.size();
761 int size = chunkSize;
762 if (chunkSize > snapshotLength) {
763 size = snapshotLength;
765 if ((start + chunkSize) > snapshotLength) {
766 size = snapshotLength - start;
769 return bs.substring(start, start + size);
772 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
773 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
775 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
776 AppendEntriesReply.class);
778 assertEquals("isSuccess", expSuccess, reply.isSuccess());
779 assertEquals("getTerm", expTerm, reply.getTerm());
780 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
781 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
782 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
785 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
786 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
787 new MockRaftActorContext.MockPayload(data));
790 private ByteString createSnapshot(){
791 HashMap<String, String> followerSnapshot = new HashMap<>();
792 followerSnapshot.put("1", "A");
793 followerSnapshot.put("2", "B");
794 followerSnapshot.put("3", "C");
796 return toByteString(followerSnapshot);
800 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
801 ActorRef actorRef, RaftRPC rpc) throws Exception {
802 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
804 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
805 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
809 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
811 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
812 assertEquals("isSuccess", true, reply.isSuccess());