1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
26 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftActorContext;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.Snapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
32 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
39 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.duration.FiniteDuration;
43 public class FollowerTest extends AbstractRaftActorBehaviorTest {
45 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
46 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
48 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
49 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
51 private RaftActorBehavior follower;
53 private final short payloadVersion = 5;
57 public void tearDown() throws Exception {
58 if(follower != null) {
66 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
67 return new Follower(actorContext);
71 protected MockRaftActorContext createActorContext() {
72 return createActorContext(followerActor);
76 protected MockRaftActorContext createActorContext(ActorRef actorRef){
77 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
78 context.setPayloadVersion(payloadVersion );
83 public void testThatAnElectionTimeoutIsTriggered(){
84 MockRaftActorContext actorContext = createActorContext();
85 follower = new Follower(actorContext);
87 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
88 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
92 public void testHandleElectionTimeout(){
93 logStart("testHandleElectionTimeout");
95 follower = new Follower(createActorContext());
97 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
99 assertTrue(raftBehavior instanceof Candidate);
103 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
104 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
106 RaftActorContext context = createActorContext();
108 context.getTermInformation().update(term, null);
110 follower = createBehavior(context);
112 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
114 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
116 assertEquals("isVoteGranted", true, reply.isVoteGranted());
117 assertEquals("getTerm", term, reply.getTerm());
121 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
122 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
124 RaftActorContext context = createActorContext();
126 context.getTermInformation().update(term, "test");
128 follower = createBehavior(context);
130 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
132 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
134 assertEquals("isVoteGranted", false, reply.isVoteGranted());
139 public void testHandleFirstAppendEntries() throws Exception {
140 logStart("testHandleFirstAppendEntries");
142 MockRaftActorContext context = createActorContext();
144 List<ReplicatedLogEntry> entries = Arrays.asList(
145 newReplicatedLogEntry(2, 101, "foo"));
147 // The new commitIndex is 101
148 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
150 follower = createBehavior(context);
151 follower.handleMessage(leaderActor, appendEntries);
153 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
155 assertFalse(syncStatus.isInitialSyncDone());
159 public void testHandleSyncUpAppendEntries() throws Exception {
160 logStart("testHandleSyncUpAppendEntries");
162 MockRaftActorContext context = createActorContext();
164 List<ReplicatedLogEntry> entries = Arrays.asList(
165 newReplicatedLogEntry(2, 101, "foo"));
167 // The new commitIndex is 101
168 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
170 follower = createBehavior(context);
171 follower.handleMessage(leaderActor, appendEntries);
173 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
175 assertFalse(syncStatus.isInitialSyncDone());
177 // Clear all the messages
178 followerActor.underlyingActor().clear();
180 context.setLastApplied(101);
181 context.setCommitIndex(101);
182 setLastLogEntry(context, 1, 101,
183 new MockRaftActorContext.MockPayload(""));
185 entries = Arrays.asList(
186 newReplicatedLogEntry(2, 101, "foo"));
188 // The new commitIndex is 101
189 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
190 follower.handleMessage(leaderActor, appendEntries);
192 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
194 assertTrue(syncStatus.isInitialSyncDone());
196 followerActor.underlyingActor().clear();
198 // Sending the same message again should not generate another message
200 follower.handleMessage(leaderActor, appendEntries);
202 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
204 assertNull(syncStatus);
209 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
210 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
212 MockRaftActorContext context = createActorContext();
214 List<ReplicatedLogEntry> entries = Arrays.asList(
215 newReplicatedLogEntry(2, 101, "foo"));
217 // The new commitIndex is 101
218 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
220 follower = createBehavior(context);
221 follower.handleMessage(leaderActor, appendEntries);
223 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
225 assertFalse(syncStatus.isInitialSyncDone());
227 // Clear all the messages
228 followerActor.underlyingActor().clear();
230 context.setLastApplied(100);
231 setLastLogEntry(context, 1, 100,
232 new MockRaftActorContext.MockPayload(""));
234 entries = Arrays.asList(
235 newReplicatedLogEntry(2, 101, "foo"));
237 // leader-2 is becoming the leader now and it says the commitIndex is 45
238 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
239 follower.handleMessage(leaderActor, appendEntries);
241 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
243 // We get a new message saying initial status is not done
244 assertFalse(syncStatus.isInitialSyncDone());
250 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
251 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
253 MockRaftActorContext context = createActorContext();
255 List<ReplicatedLogEntry> entries = Arrays.asList(
256 newReplicatedLogEntry(2, 101, "foo"));
258 // The new commitIndex is 101
259 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
261 follower = createBehavior(context);
262 follower.handleMessage(leaderActor, appendEntries);
264 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
266 assertFalse(syncStatus.isInitialSyncDone());
268 // Clear all the messages
269 followerActor.underlyingActor().clear();
271 context.setLastApplied(101);
272 context.setCommitIndex(101);
273 setLastLogEntry(context, 1, 101,
274 new MockRaftActorContext.MockPayload(""));
276 entries = Arrays.asList(
277 newReplicatedLogEntry(2, 101, "foo"));
279 // The new commitIndex is 101
280 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
281 follower.handleMessage(leaderActor, appendEntries);
283 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
285 assertTrue(syncStatus.isInitialSyncDone());
287 // Clear all the messages
288 followerActor.underlyingActor().clear();
290 context.setLastApplied(100);
291 setLastLogEntry(context, 1, 100,
292 new MockRaftActorContext.MockPayload(""));
294 entries = Arrays.asList(
295 newReplicatedLogEntry(2, 101, "foo"));
297 // leader-2 is becoming the leader now and it says the commitIndex is 45
298 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
299 follower.handleMessage(leaderActor, appendEntries);
301 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
303 // We get a new message saying initial status is not done
304 assertFalse(syncStatus.isInitialSyncDone());
310 * This test verifies that when an AppendEntries RPC is received by a RaftActor
311 * with a commitIndex that is greater than what has been applied to the
312 * state machine of the RaftActor, the RaftActor applies the state and
313 * sets it current applied state to the commitIndex of the sender.
318 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
319 logStart("testHandleAppendEntriesWithNewerCommitIndex");
321 MockRaftActorContext context = createActorContext();
323 context.setLastApplied(100);
324 setLastLogEntry(context, 1, 100,
325 new MockRaftActorContext.MockPayload(""));
326 context.getReplicatedLog().setSnapshotIndex(99);
328 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
329 newReplicatedLogEntry(2, 101, "foo"));
331 // The new commitIndex is 101
332 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
334 follower = createBehavior(context);
335 follower.handleMessage(leaderActor, appendEntries);
337 assertEquals("getLastApplied", 101L, context.getLastApplied());
341 * This test verifies that when an AppendEntries is received a specific prevLogTerm
342 * which does not match the term that is in RaftActors log entry at prevLogIndex
343 * then the RaftActor does not change it's state and it returns a failure.
348 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
349 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
351 MockRaftActorContext context = createActorContext();
353 // First set the receivers term to lower number
354 context.getTermInformation().update(95, "test");
356 // AppendEntries is now sent with a bigger term
357 // this will set the receivers term to be the same as the sender's term
358 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
360 follower = createBehavior(context);
362 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
364 Assert.assertSame(follower, newBehavior);
366 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
367 AppendEntriesReply.class);
369 assertEquals("isSuccess", false, reply.isSuccess());
373 * This test verifies that when a new AppendEntries message is received with
374 * new entries and the logs of the sender and receiver match that the new
375 * entries get added to the log and the log is incremented by the number of
376 * entries received in appendEntries
381 public void testHandleAppendEntriesAddNewEntries() {
382 logStart("testHandleAppendEntriesAddNewEntries");
384 MockRaftActorContext context = createActorContext();
386 // First set the receivers term to lower number
387 context.getTermInformation().update(1, "test");
389 // Prepare the receivers log
390 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
391 log.append(newReplicatedLogEntry(1, 0, "zero"));
392 log.append(newReplicatedLogEntry(1, 1, "one"));
393 log.append(newReplicatedLogEntry(1, 2, "two"));
395 context.setReplicatedLog(log);
397 // Prepare the entries to be sent with AppendEntries
398 List<ReplicatedLogEntry> entries = new ArrayList<>();
399 entries.add(newReplicatedLogEntry(1, 3, "three"));
400 entries.add(newReplicatedLogEntry(1, 4, "four"));
402 // Send appendEntries with the same term as was set on the receiver
403 // before the new behavior was created (1 in this case)
404 // This will not work for a Candidate because as soon as a Candidate
405 // is created it increments the term
406 short leaderPayloadVersion = 10;
407 String leaderId = "leader-1";
408 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
410 follower = createBehavior(context);
412 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
414 Assert.assertSame(follower, newBehavior);
416 assertEquals("Next index", 5, log.last().getIndex() + 1);
417 assertEquals("Entry 3", entries.get(0), log.get(3));
418 assertEquals("Entry 4", entries.get(1), log.get(4));
420 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
421 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
423 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
427 * This test verifies that when a new AppendEntries message is received with
428 * new entries and the logs of the sender and receiver are out-of-sync that
429 * the log is first corrected by removing the out of sync entries from the
430 * log and then adding in the new entries sent with the AppendEntries message
433 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
434 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
436 MockRaftActorContext context = createActorContext();
438 // First set the receivers term to lower number
439 context.getTermInformation().update(1, "test");
441 // Prepare the receivers log
442 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
443 log.append(newReplicatedLogEntry(1, 0, "zero"));
444 log.append(newReplicatedLogEntry(1, 1, "one"));
445 log.append(newReplicatedLogEntry(1, 2, "two"));
447 context.setReplicatedLog(log);
449 // Prepare the entries to be sent with AppendEntries
450 List<ReplicatedLogEntry> entries = new ArrayList<>();
451 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
452 entries.add(newReplicatedLogEntry(2, 3, "three"));
454 // Send appendEntries with the same term as was set on the receiver
455 // before the new behavior was created (1 in this case)
456 // This will not work for a Candidate because as soon as a Candidate
457 // is created it increments the term
458 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
460 follower = createBehavior(context);
462 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
464 Assert.assertSame(follower, newBehavior);
466 // The entry at index 2 will be found out-of-sync with the leader
467 // and will be removed
468 // Then the two new entries will be added to the log
469 // Thus making the log to have 4 entries
470 assertEquals("Next index", 4, log.last().getIndex() + 1);
471 //assertEquals("Entry 2", entries.get(0), log.get(2));
473 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
475 // Check that the entry at index 2 has the new data
476 assertEquals("Entry 2", entries.get(0), log.get(2));
478 assertEquals("Entry 3", entries.get(1), log.get(3));
480 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
484 public void testHandleAppendEntriesPreviousLogEntryMissing(){
485 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
487 MockRaftActorContext context = createActorContext();
489 // Prepare the receivers log
490 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
491 log.append(newReplicatedLogEntry(1, 0, "zero"));
492 log.append(newReplicatedLogEntry(1, 1, "one"));
493 log.append(newReplicatedLogEntry(1, 2, "two"));
495 context.setReplicatedLog(log);
497 // Prepare the entries to be sent with AppendEntries
498 List<ReplicatedLogEntry> entries = new ArrayList<>();
499 entries.add(newReplicatedLogEntry(1, 4, "four"));
501 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
503 follower = createBehavior(context);
505 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
507 Assert.assertSame(follower, newBehavior);
509 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
513 public void testHandleAppendEntriesWithExistingLogEntry() {
514 logStart("testHandleAppendEntriesWithExistingLogEntry");
516 MockRaftActorContext context = createActorContext();
518 context.getTermInformation().update(1, "test");
520 // Prepare the receivers log
521 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
522 log.append(newReplicatedLogEntry(1, 0, "zero"));
523 log.append(newReplicatedLogEntry(1, 1, "one"));
525 context.setReplicatedLog(log);
527 // Send the last entry again.
528 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
530 follower = createBehavior(context);
532 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
534 assertEquals("Next index", 2, log.last().getIndex() + 1);
535 assertEquals("Entry 1", entries.get(0), log.get(1));
537 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
539 // Send the last entry again and also a new one.
541 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
543 leaderActor.underlyingActor().clear();
544 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
546 assertEquals("Next index", 3, log.last().getIndex() + 1);
547 assertEquals("Entry 1", entries.get(0), log.get(1));
548 assertEquals("Entry 2", entries.get(1), log.get(2));
550 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
554 public void testHandleAppendEntriesAfterInstallingSnapshot(){
555 logStart("testHandleAppendAfterInstallingSnapshot");
557 MockRaftActorContext context = createActorContext();
559 // Prepare the receivers log
560 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
562 // Set up a log as if it has been snapshotted
563 log.setSnapshotIndex(3);
564 log.setSnapshotTerm(1);
566 context.setReplicatedLog(log);
568 // Prepare the entries to be sent with AppendEntries
569 List<ReplicatedLogEntry> entries = new ArrayList<>();
570 entries.add(newReplicatedLogEntry(1, 4, "four"));
572 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
574 follower = createBehavior(context);
576 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
578 Assert.assertSame(follower, newBehavior);
580 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
585 * This test verifies that when InstallSnapshot is received by
586 * the follower its applied correctly.
591 public void testHandleInstallSnapshot() throws Exception {
592 logStart("testHandleInstallSnapshot");
594 MockRaftActorContext context = createActorContext();
596 follower = createBehavior(context);
598 ByteString bsSnapshot = createSnapshot();
600 int snapshotLength = bsSnapshot.size();
602 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
603 int lastIncludedIndex = 1;
605 InstallSnapshot lastInstallSnapshot = null;
607 for(int i = 0; i < totalChunks; i++) {
608 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
609 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
610 chunkData, chunkIndex, totalChunks);
611 follower.handleMessage(leaderActor, lastInstallSnapshot);
612 offset = offset + 50;
617 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
618 ApplySnapshot.class);
619 Snapshot snapshot = applySnapshot.getSnapshot();
620 assertNotNull(lastInstallSnapshot);
621 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
622 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
623 snapshot.getLastAppliedTerm());
624 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
625 snapshot.getLastAppliedIndex());
626 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
627 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
629 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
630 leaderActor, InstallSnapshotReply.class);
631 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
634 for(InstallSnapshotReply reply: replies) {
635 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
636 assertEquals("getTerm", 1, reply.getTerm());
637 assertEquals("isSuccess", true, reply.isSuccess());
638 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
641 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
646 * Verify that when an AppendEntries is sent to a follower during a snapshot install
647 * the Follower short-circuits the processing of the AppendEntries message.
652 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
653 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
655 MockRaftActorContext context = createActorContext();
657 follower = createBehavior(context);
659 ByteString bsSnapshot = createSnapshot();
660 int snapshotLength = bsSnapshot.size();
662 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
663 int lastIncludedIndex = 1;
665 // Check that snapshot installation is not in progress
666 assertNull(((Follower) follower).getSnapshotTracker());
668 // Make sure that we have more than 1 chunk to send
669 assertTrue(totalChunks > 1);
671 // Send an install snapshot with the first chunk to start the process of installing a snapshot
672 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
673 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
674 chunkData, 1, totalChunks));
676 // Check if snapshot installation is in progress now
677 assertNotNull(((Follower) follower).getSnapshotTracker());
679 // Send an append entry
680 AppendEntries appendEntries = mock(AppendEntries.class);
681 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
683 follower.handleMessage(leaderActor, appendEntries);
685 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
686 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
687 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
688 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
690 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
691 verify(appendEntries, never()).getPrevLogIndex();
696 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
697 logStart("testInitialSyncUpWithHandleInstallSnapshot");
699 MockRaftActorContext context = createActorContext();
701 follower = createBehavior(context);
703 ByteString bsSnapshot = createSnapshot();
705 int snapshotLength = bsSnapshot.size();
707 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
708 int lastIncludedIndex = 1;
710 InstallSnapshot lastInstallSnapshot = null;
712 for(int i = 0; i < totalChunks; i++) {
713 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
714 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
715 chunkData, chunkIndex, totalChunks);
716 follower.handleMessage(leaderActor, lastInstallSnapshot);
717 offset = offset + 50;
722 FollowerInitialSyncUpStatus syncStatus =
723 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
725 assertFalse(syncStatus.isInitialSyncDone());
727 // Clear all the messages
728 followerActor.underlyingActor().clear();
730 context.setLastApplied(101);
731 context.setCommitIndex(101);
732 setLastLogEntry(context, 1, 101,
733 new MockRaftActorContext.MockPayload(""));
735 List<ReplicatedLogEntry> entries = Arrays.asList(
736 newReplicatedLogEntry(2, 101, "foo"));
738 // The new commitIndex is 101
739 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
740 follower.handleMessage(leaderActor, appendEntries);
742 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
744 assertTrue(syncStatus.isInitialSyncDone());
748 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
749 logStart("testHandleOutOfSequenceInstallSnapshot");
751 MockRaftActorContext context = createActorContext();
753 follower = createBehavior(context);
755 ByteString bsSnapshot = createSnapshot();
757 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
758 getNextChunk(bsSnapshot, 10, 50), 3, 3);
759 follower.handleMessage(leaderActor, installSnapshot);
761 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
762 InstallSnapshotReply.class);
764 assertEquals("isSuccess", false, reply.isSuccess());
765 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
766 assertEquals("getTerm", 1, reply.getTerm());
767 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
769 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
773 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
774 MockRaftActorContext context = createActorContext();
776 Stopwatch stopwatch = Stopwatch.createStarted();
778 follower = createBehavior(context);
780 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
782 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
784 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
788 public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
789 MockRaftActorContext context = createActorContext();
790 context.setConfigParams(new DefaultConfigParamsImpl(){
792 public FiniteDuration getElectionTimeOutInterval() {
793 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
797 context.setRaftPolicy(createRaftPolicy(false, false));
799 follower = createBehavior(context);
801 MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
805 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
806 int snapshotLength = bs.size();
808 int size = chunkSize;
809 if (chunkSize > snapshotLength) {
810 size = snapshotLength;
812 if ((start + chunkSize) > snapshotLength) {
813 size = snapshotLength - start;
816 return bs.substring(start, start + size);
819 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
820 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
822 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
823 AppendEntriesReply.class);
825 assertEquals("isSuccess", expSuccess, reply.isSuccess());
826 assertEquals("getTerm", expTerm, reply.getTerm());
827 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
828 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
829 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
830 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
833 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
834 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
835 new MockRaftActorContext.MockPayload(data));
838 private ByteString createSnapshot(){
839 HashMap<String, String> followerSnapshot = new HashMap<>();
840 followerSnapshot.put("1", "A");
841 followerSnapshot.put("2", "B");
842 followerSnapshot.put("3", "C");
844 return toByteString(followerSnapshot);
848 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
849 ActorRef actorRef, RaftRPC rpc) throws Exception {
850 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
852 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
853 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
857 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
859 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
860 assertEquals("isSuccess", true, reply.isSuccess());