1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.Snapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
31 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 public class FollowerTest extends AbstractRaftActorBehaviorTest {
43 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
44 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
46 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
47 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
49 private RaftActorBehavior follower;
51 private final short payloadVersion = 5;
55 public void tearDown() throws Exception {
56 if(follower != null) {
64 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
65 return new Follower(actorContext);
69 protected MockRaftActorContext createActorContext() {
70 return createActorContext(followerActor);
74 protected MockRaftActorContext createActorContext(ActorRef actorRef){
75 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
76 context.setPayloadVersion(payloadVersion );
81 public void testThatAnElectionTimeoutIsTriggered(){
82 MockRaftActorContext actorContext = createActorContext();
83 follower = new Follower(actorContext);
85 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
86 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
90 public void testHandleElectionTimeout(){
91 logStart("testHandleElectionTimeout");
93 follower = new Follower(createActorContext());
95 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
97 assertTrue(raftBehavior instanceof Candidate);
101 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
102 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
104 RaftActorContext context = createActorContext();
106 context.getTermInformation().update(term, null);
108 follower = createBehavior(context);
110 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
112 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
114 assertEquals("isVoteGranted", true, reply.isVoteGranted());
115 assertEquals("getTerm", term, reply.getTerm());
119 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
120 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
122 RaftActorContext context = createActorContext();
124 context.getTermInformation().update(term, "test");
126 follower = createBehavior(context);
128 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
130 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
132 assertEquals("isVoteGranted", false, reply.isVoteGranted());
137 public void testHandleFirstAppendEntries() throws Exception {
138 logStart("testHandleFirstAppendEntries");
140 MockRaftActorContext context = createActorContext();
142 List<ReplicatedLogEntry> entries = Arrays.asList(
143 newReplicatedLogEntry(2, 101, "foo"));
145 // The new commitIndex is 101
146 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
148 follower = createBehavior(context);
149 follower.handleMessage(leaderActor, appendEntries);
151 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
153 assertFalse(syncStatus.isInitialSyncDone());
157 public void testHandleSyncUpAppendEntries() throws Exception {
158 logStart("testHandleSyncUpAppendEntries");
160 MockRaftActorContext context = createActorContext();
162 List<ReplicatedLogEntry> entries = Arrays.asList(
163 newReplicatedLogEntry(2, 101, "foo"));
165 // The new commitIndex is 101
166 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
168 follower = createBehavior(context);
169 follower.handleMessage(leaderActor, appendEntries);
171 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
173 assertFalse(syncStatus.isInitialSyncDone());
175 // Clear all the messages
176 followerActor.underlyingActor().clear();
178 context.setLastApplied(101);
179 context.setCommitIndex(101);
180 setLastLogEntry(context, 1, 101,
181 new MockRaftActorContext.MockPayload(""));
183 entries = Arrays.asList(
184 newReplicatedLogEntry(2, 101, "foo"));
186 // The new commitIndex is 101
187 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
188 follower.handleMessage(leaderActor, appendEntries);
190 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
192 assertTrue(syncStatus.isInitialSyncDone());
194 followerActor.underlyingActor().clear();
196 // Sending the same message again should not generate another message
198 follower.handleMessage(leaderActor, appendEntries);
200 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
202 assertNull(syncStatus);
207 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
208 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
210 MockRaftActorContext context = createActorContext();
212 List<ReplicatedLogEntry> entries = Arrays.asList(
213 newReplicatedLogEntry(2, 101, "foo"));
215 // The new commitIndex is 101
216 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
218 follower = createBehavior(context);
219 follower.handleMessage(leaderActor, appendEntries);
221 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
223 assertFalse(syncStatus.isInitialSyncDone());
225 // Clear all the messages
226 followerActor.underlyingActor().clear();
228 context.setLastApplied(100);
229 setLastLogEntry(context, 1, 100,
230 new MockRaftActorContext.MockPayload(""));
232 entries = Arrays.asList(
233 newReplicatedLogEntry(2, 101, "foo"));
235 // leader-2 is becoming the leader now and it says the commitIndex is 45
236 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
237 follower.handleMessage(leaderActor, appendEntries);
239 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
241 // We get a new message saying initial status is not done
242 assertFalse(syncStatus.isInitialSyncDone());
248 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
249 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
251 MockRaftActorContext context = createActorContext();
253 List<ReplicatedLogEntry> entries = Arrays.asList(
254 newReplicatedLogEntry(2, 101, "foo"));
256 // The new commitIndex is 101
257 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
259 follower = createBehavior(context);
260 follower.handleMessage(leaderActor, appendEntries);
262 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
264 assertFalse(syncStatus.isInitialSyncDone());
266 // Clear all the messages
267 followerActor.underlyingActor().clear();
269 context.setLastApplied(101);
270 context.setCommitIndex(101);
271 setLastLogEntry(context, 1, 101,
272 new MockRaftActorContext.MockPayload(""));
274 entries = Arrays.asList(
275 newReplicatedLogEntry(2, 101, "foo"));
277 // The new commitIndex is 101
278 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
279 follower.handleMessage(leaderActor, appendEntries);
281 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
283 assertTrue(syncStatus.isInitialSyncDone());
285 // Clear all the messages
286 followerActor.underlyingActor().clear();
288 context.setLastApplied(100);
289 setLastLogEntry(context, 1, 100,
290 new MockRaftActorContext.MockPayload(""));
292 entries = Arrays.asList(
293 newReplicatedLogEntry(2, 101, "foo"));
295 // leader-2 is becoming the leader now and it says the commitIndex is 45
296 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
297 follower.handleMessage(leaderActor, appendEntries);
299 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
301 // We get a new message saying initial status is not done
302 assertFalse(syncStatus.isInitialSyncDone());
308 * This test verifies that when an AppendEntries RPC is received by a RaftActor
309 * with a commitIndex that is greater than what has been applied to the
310 * state machine of the RaftActor, the RaftActor applies the state and
311 * sets it current applied state to the commitIndex of the sender.
316 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
317 logStart("testHandleAppendEntriesWithNewerCommitIndex");
319 MockRaftActorContext context = createActorContext();
321 context.setLastApplied(100);
322 setLastLogEntry(context, 1, 100,
323 new MockRaftActorContext.MockPayload(""));
324 context.getReplicatedLog().setSnapshotIndex(99);
326 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
327 newReplicatedLogEntry(2, 101, "foo"));
329 // The new commitIndex is 101
330 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
332 follower = createBehavior(context);
333 follower.handleMessage(leaderActor, appendEntries);
335 assertEquals("getLastApplied", 101L, context.getLastApplied());
339 * This test verifies that when an AppendEntries is received a specific prevLogTerm
340 * which does not match the term that is in RaftActors log entry at prevLogIndex
341 * then the RaftActor does not change it's state and it returns a failure.
346 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
347 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
349 MockRaftActorContext context = createActorContext();
351 // First set the receivers term to lower number
352 context.getTermInformation().update(95, "test");
354 // AppendEntries is now sent with a bigger term
355 // this will set the receivers term to be the same as the sender's term
356 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
358 follower = createBehavior(context);
360 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
362 Assert.assertSame(follower, newBehavior);
364 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
365 AppendEntriesReply.class);
367 assertEquals("isSuccess", false, reply.isSuccess());
371 * This test verifies that when a new AppendEntries message is received with
372 * new entries and the logs of the sender and receiver match that the new
373 * entries get added to the log and the log is incremented by the number of
374 * entries received in appendEntries
379 public void testHandleAppendEntriesAddNewEntries() {
380 logStart("testHandleAppendEntriesAddNewEntries");
382 MockRaftActorContext context = createActorContext();
384 // First set the receivers term to lower number
385 context.getTermInformation().update(1, "test");
387 // Prepare the receivers log
388 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
389 log.append(newReplicatedLogEntry(1, 0, "zero"));
390 log.append(newReplicatedLogEntry(1, 1, "one"));
391 log.append(newReplicatedLogEntry(1, 2, "two"));
393 context.setReplicatedLog(log);
395 // Prepare the entries to be sent with AppendEntries
396 List<ReplicatedLogEntry> entries = new ArrayList<>();
397 entries.add(newReplicatedLogEntry(1, 3, "three"));
398 entries.add(newReplicatedLogEntry(1, 4, "four"));
400 // Send appendEntries with the same term as was set on the receiver
401 // before the new behavior was created (1 in this case)
402 // This will not work for a Candidate because as soon as a Candidate
403 // is created it increments the term
404 AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0);
406 follower = createBehavior(context);
408 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
410 Assert.assertSame(follower, newBehavior);
412 assertEquals("Next index", 5, log.last().getIndex() + 1);
413 assertEquals("Entry 3", entries.get(0), log.get(3));
414 assertEquals("Entry 4", entries.get(1), log.get(4));
416 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
420 * This test verifies that when a new AppendEntries message is received with
421 * new entries and the logs of the sender and receiver are out-of-sync that
422 * the log is first corrected by removing the out of sync entries from the
423 * log and then adding in the new entries sent with the AppendEntries message
426 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
427 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
429 MockRaftActorContext context = createActorContext();
431 // First set the receivers term to lower number
432 context.getTermInformation().update(1, "test");
434 // Prepare the receivers log
435 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
436 log.append(newReplicatedLogEntry(1, 0, "zero"));
437 log.append(newReplicatedLogEntry(1, 1, "one"));
438 log.append(newReplicatedLogEntry(1, 2, "two"));
440 context.setReplicatedLog(log);
442 // Prepare the entries to be sent with AppendEntries
443 List<ReplicatedLogEntry> entries = new ArrayList<>();
444 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
445 entries.add(newReplicatedLogEntry(2, 3, "three"));
447 // Send appendEntries with the same term as was set on the receiver
448 // before the new behavior was created (1 in this case)
449 // This will not work for a Candidate because as soon as a Candidate
450 // is created it increments the term
451 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
453 follower = createBehavior(context);
455 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
457 Assert.assertSame(follower, newBehavior);
459 // The entry at index 2 will be found out-of-sync with the leader
460 // and will be removed
461 // Then the two new entries will be added to the log
462 // Thus making the log to have 4 entries
463 assertEquals("Next index", 4, log.last().getIndex() + 1);
464 //assertEquals("Entry 2", entries.get(0), log.get(2));
466 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
468 // Check that the entry at index 2 has the new data
469 assertEquals("Entry 2", entries.get(0), log.get(2));
471 assertEquals("Entry 3", entries.get(1), log.get(3));
473 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
477 public void testHandleAppendEntriesPreviousLogEntryMissing(){
478 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
480 MockRaftActorContext context = createActorContext();
482 // Prepare the receivers log
483 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
484 log.append(newReplicatedLogEntry(1, 0, "zero"));
485 log.append(newReplicatedLogEntry(1, 1, "one"));
486 log.append(newReplicatedLogEntry(1, 2, "two"));
488 context.setReplicatedLog(log);
490 // Prepare the entries to be sent with AppendEntries
491 List<ReplicatedLogEntry> entries = new ArrayList<>();
492 entries.add(newReplicatedLogEntry(1, 4, "four"));
494 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
496 follower = createBehavior(context);
498 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
500 Assert.assertSame(follower, newBehavior);
502 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
506 public void testHandleAppendEntriesWithExistingLogEntry() {
507 logStart("testHandleAppendEntriesWithExistingLogEntry");
509 MockRaftActorContext context = createActorContext();
511 context.getTermInformation().update(1, "test");
513 // Prepare the receivers log
514 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
515 log.append(newReplicatedLogEntry(1, 0, "zero"));
516 log.append(newReplicatedLogEntry(1, 1, "one"));
518 context.setReplicatedLog(log);
520 // Send the last entry again.
521 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
523 follower = createBehavior(context);
525 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
527 assertEquals("Next index", 2, log.last().getIndex() + 1);
528 assertEquals("Entry 1", entries.get(0), log.get(1));
530 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
532 // Send the last entry again and also a new one.
534 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
536 leaderActor.underlyingActor().clear();
537 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
539 assertEquals("Next index", 3, log.last().getIndex() + 1);
540 assertEquals("Entry 1", entries.get(0), log.get(1));
541 assertEquals("Entry 2", entries.get(1), log.get(2));
543 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
547 public void testHandleAppendEntriesAfterInstallingSnapshot(){
548 logStart("testHandleAppendAfterInstallingSnapshot");
550 MockRaftActorContext context = createActorContext();
552 // Prepare the receivers log
553 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
555 // Set up a log as if it has been snapshotted
556 log.setSnapshotIndex(3);
557 log.setSnapshotTerm(1);
559 context.setReplicatedLog(log);
561 // Prepare the entries to be sent with AppendEntries
562 List<ReplicatedLogEntry> entries = new ArrayList<>();
563 entries.add(newReplicatedLogEntry(1, 4, "four"));
565 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
567 follower = createBehavior(context);
569 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
571 Assert.assertSame(follower, newBehavior);
573 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
578 * This test verifies that when InstallSnapshot is received by
579 * the follower its applied correctly.
584 public void testHandleInstallSnapshot() throws Exception {
585 logStart("testHandleInstallSnapshot");
587 MockRaftActorContext context = createActorContext();
589 follower = createBehavior(context);
591 ByteString bsSnapshot = createSnapshot();
593 int snapshotLength = bsSnapshot.size();
595 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
596 int lastIncludedIndex = 1;
598 InstallSnapshot lastInstallSnapshot = null;
600 for(int i = 0; i < totalChunks; i++) {
601 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
602 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
603 chunkData, chunkIndex, totalChunks);
604 follower.handleMessage(leaderActor, lastInstallSnapshot);
605 offset = offset + 50;
610 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
611 ApplySnapshot.class);
612 Snapshot snapshot = applySnapshot.getSnapshot();
613 assertNotNull(lastInstallSnapshot);
614 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
615 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
616 snapshot.getLastAppliedTerm());
617 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
618 snapshot.getLastAppliedIndex());
619 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
620 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
622 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
623 leaderActor, InstallSnapshotReply.class);
624 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
627 for(InstallSnapshotReply reply: replies) {
628 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
629 assertEquals("getTerm", 1, reply.getTerm());
630 assertEquals("isSuccess", true, reply.isSuccess());
631 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
634 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
639 * Verify that when an AppendEntries is sent to a follower during a snapshot install
640 * the Follower short-circuits the processing of the AppendEntries message.
645 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
646 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
648 MockRaftActorContext context = createActorContext();
650 follower = createBehavior(context);
652 ByteString bsSnapshot = createSnapshot();
653 int snapshotLength = bsSnapshot.size();
655 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
656 int lastIncludedIndex = 1;
658 // Check that snapshot installation is not in progress
659 assertNull(((Follower) follower).getSnapshotTracker());
661 // Make sure that we have more than 1 chunk to send
662 assertTrue(totalChunks > 1);
664 // Send an install snapshot with the first chunk to start the process of installing a snapshot
665 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
666 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
667 chunkData, 1, totalChunks));
669 // Check if snapshot installation is in progress now
670 assertNotNull(((Follower) follower).getSnapshotTracker());
672 // Send an append entry
673 AppendEntries appendEntries = mock(AppendEntries.class);
674 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
676 follower.handleMessage(leaderActor, appendEntries);
678 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
679 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
680 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
681 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
683 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
684 verify(appendEntries, never()).getPrevLogIndex();
689 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
690 logStart("testInitialSyncUpWithHandleInstallSnapshot");
692 MockRaftActorContext context = createActorContext();
694 follower = createBehavior(context);
696 ByteString bsSnapshot = createSnapshot();
698 int snapshotLength = bsSnapshot.size();
700 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
701 int lastIncludedIndex = 1;
703 InstallSnapshot lastInstallSnapshot = null;
705 for(int i = 0; i < totalChunks; i++) {
706 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
707 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
708 chunkData, chunkIndex, totalChunks);
709 follower.handleMessage(leaderActor, lastInstallSnapshot);
710 offset = offset + 50;
715 FollowerInitialSyncUpStatus syncStatus =
716 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
718 assertFalse(syncStatus.isInitialSyncDone());
720 // Clear all the messages
721 followerActor.underlyingActor().clear();
723 context.setLastApplied(101);
724 context.setCommitIndex(101);
725 setLastLogEntry(context, 1, 101,
726 new MockRaftActorContext.MockPayload(""));
728 List<ReplicatedLogEntry> entries = Arrays.asList(
729 newReplicatedLogEntry(2, 101, "foo"));
731 // The new commitIndex is 101
732 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
733 follower.handleMessage(leaderActor, appendEntries);
735 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
737 assertTrue(syncStatus.isInitialSyncDone());
741 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
742 logStart("testHandleOutOfSequenceInstallSnapshot");
744 MockRaftActorContext context = createActorContext();
746 follower = createBehavior(context);
748 ByteString bsSnapshot = createSnapshot();
750 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
751 getNextChunk(bsSnapshot, 10, 50), 3, 3);
752 follower.handleMessage(leaderActor, installSnapshot);
754 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
755 InstallSnapshotReply.class);
757 assertEquals("isSuccess", false, reply.isSuccess());
758 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
759 assertEquals("getTerm", 1, reply.getTerm());
760 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
762 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
766 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
767 MockRaftActorContext context = createActorContext();
769 Stopwatch stopwatch = Stopwatch.createStarted();
771 follower = createBehavior(context);
773 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
775 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
777 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
780 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
781 int snapshotLength = bs.size();
783 int size = chunkSize;
784 if (chunkSize > snapshotLength) {
785 size = snapshotLength;
787 if ((start + chunkSize) > snapshotLength) {
788 size = snapshotLength - start;
791 return bs.substring(start, start + size);
794 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
795 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
797 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
798 AppendEntriesReply.class);
800 assertEquals("isSuccess", expSuccess, reply.isSuccess());
801 assertEquals("getTerm", expTerm, reply.getTerm());
802 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
803 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
804 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
805 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
808 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
809 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
810 new MockRaftActorContext.MockPayload(data));
813 private ByteString createSnapshot(){
814 HashMap<String, String> followerSnapshot = new HashMap<>();
815 followerSnapshot.put("1", "A");
816 followerSnapshot.put("2", "B");
817 followerSnapshot.put("3", "C");
819 return toByteString(followerSnapshot);
823 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
824 ActorRef actorRef, RaftRPC rpc) throws Exception {
825 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
827 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
828 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
832 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
834 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
835 assertEquals("isSuccess", true, reply.isSuccess());