1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.Snapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
31 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 public class FollowerTest extends AbstractRaftActorBehaviorTest {
43 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
44 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
46 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
47 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
49 private RaftActorBehavior follower;
51 private final short payloadVersion = 5;
55 public void tearDown() throws Exception {
56 if(follower != null) {
64 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
65 return new Follower(actorContext);
69 protected MockRaftActorContext createActorContext() {
70 return createActorContext(followerActor);
74 protected MockRaftActorContext createActorContext(ActorRef actorRef){
75 MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
76 context.setPayloadVersion(payloadVersion );
81 public void testThatAnElectionTimeoutIsTriggered(){
82 MockRaftActorContext actorContext = createActorContext();
83 follower = new Follower(actorContext);
85 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
86 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
90 public void testHandleElectionTimeout(){
91 logStart("testHandleElectionTimeout");
93 follower = new Follower(createActorContext());
95 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
97 assertTrue(raftBehavior instanceof Candidate);
101 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
102 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
104 RaftActorContext context = createActorContext();
106 context.getTermInformation().update(term, null);
108 follower = createBehavior(context);
110 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
112 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
114 assertEquals("isVoteGranted", true, reply.isVoteGranted());
115 assertEquals("getTerm", term, reply.getTerm());
119 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
120 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
122 RaftActorContext context = createActorContext();
124 context.getTermInformation().update(term, "test");
126 follower = createBehavior(context);
128 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
130 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
132 assertEquals("isVoteGranted", false, reply.isVoteGranted());
137 public void testHandleFirstAppendEntries() throws Exception {
138 logStart("testHandleFirstAppendEntries");
140 MockRaftActorContext context = createActorContext();
142 List<ReplicatedLogEntry> entries = Arrays.asList(
143 newReplicatedLogEntry(2, 101, "foo"));
145 // The new commitIndex is 101
146 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
148 follower = createBehavior(context);
149 follower.handleMessage(leaderActor, appendEntries);
151 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
153 assertFalse(syncStatus.isInitialSyncDone());
157 public void testHandleSyncUpAppendEntries() throws Exception {
158 logStart("testHandleSyncUpAppendEntries");
160 MockRaftActorContext context = createActorContext();
162 List<ReplicatedLogEntry> entries = Arrays.asList(
163 newReplicatedLogEntry(2, 101, "foo"));
165 // The new commitIndex is 101
166 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
168 follower = createBehavior(context);
169 follower.handleMessage(leaderActor, appendEntries);
171 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
173 assertFalse(syncStatus.isInitialSyncDone());
175 // Clear all the messages
176 followerActor.underlyingActor().clear();
178 context.setLastApplied(101);
179 context.setCommitIndex(101);
180 setLastLogEntry(context, 1, 101,
181 new MockRaftActorContext.MockPayload(""));
183 entries = Arrays.asList(
184 newReplicatedLogEntry(2, 101, "foo"));
186 // The new commitIndex is 101
187 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
188 follower.handleMessage(leaderActor, appendEntries);
190 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
192 assertTrue(syncStatus.isInitialSyncDone());
194 followerActor.underlyingActor().clear();
196 // Sending the same message again should not generate another message
198 follower.handleMessage(leaderActor, appendEntries);
200 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
202 assertNull(syncStatus);
207 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
208 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
210 MockRaftActorContext context = createActorContext();
212 List<ReplicatedLogEntry> entries = Arrays.asList(
213 newReplicatedLogEntry(2, 101, "foo"));
215 // The new commitIndex is 101
216 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
218 follower = createBehavior(context);
219 follower.handleMessage(leaderActor, appendEntries);
221 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
223 assertFalse(syncStatus.isInitialSyncDone());
225 // Clear all the messages
226 followerActor.underlyingActor().clear();
228 context.setLastApplied(100);
229 setLastLogEntry(context, 1, 100,
230 new MockRaftActorContext.MockPayload(""));
232 entries = Arrays.asList(
233 newReplicatedLogEntry(2, 101, "foo"));
235 // leader-2 is becoming the leader now and it says the commitIndex is 45
236 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
237 follower.handleMessage(leaderActor, appendEntries);
239 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
241 // We get a new message saying initial status is not done
242 assertFalse(syncStatus.isInitialSyncDone());
248 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
249 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
251 MockRaftActorContext context = createActorContext();
253 List<ReplicatedLogEntry> entries = Arrays.asList(
254 newReplicatedLogEntry(2, 101, "foo"));
256 // The new commitIndex is 101
257 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
259 follower = createBehavior(context);
260 follower.handleMessage(leaderActor, appendEntries);
262 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
264 assertFalse(syncStatus.isInitialSyncDone());
266 // Clear all the messages
267 followerActor.underlyingActor().clear();
269 context.setLastApplied(101);
270 context.setCommitIndex(101);
271 setLastLogEntry(context, 1, 101,
272 new MockRaftActorContext.MockPayload(""));
274 entries = Arrays.asList(
275 newReplicatedLogEntry(2, 101, "foo"));
277 // The new commitIndex is 101
278 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
279 follower.handleMessage(leaderActor, appendEntries);
281 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
283 assertTrue(syncStatus.isInitialSyncDone());
285 // Clear all the messages
286 followerActor.underlyingActor().clear();
288 context.setLastApplied(100);
289 setLastLogEntry(context, 1, 100,
290 new MockRaftActorContext.MockPayload(""));
292 entries = Arrays.asList(
293 newReplicatedLogEntry(2, 101, "foo"));
295 // leader-2 is becoming the leader now and it says the commitIndex is 45
296 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
297 follower.handleMessage(leaderActor, appendEntries);
299 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
301 // We get a new message saying initial status is not done
302 assertFalse(syncStatus.isInitialSyncDone());
308 * This test verifies that when an AppendEntries RPC is received by a RaftActor
309 * with a commitIndex that is greater than what has been applied to the
310 * state machine of the RaftActor, the RaftActor applies the state and
311 * sets it current applied state to the commitIndex of the sender.
316 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
317 logStart("testHandleAppendEntriesWithNewerCommitIndex");
319 MockRaftActorContext context = createActorContext();
321 context.setLastApplied(100);
322 setLastLogEntry(context, 1, 100,
323 new MockRaftActorContext.MockPayload(""));
324 context.getReplicatedLog().setSnapshotIndex(99);
326 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
327 newReplicatedLogEntry(2, 101, "foo"));
329 // The new commitIndex is 101
330 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
332 follower = createBehavior(context);
333 follower.handleMessage(leaderActor, appendEntries);
335 assertEquals("getLastApplied", 101L, context.getLastApplied());
339 * This test verifies that when an AppendEntries is received a specific prevLogTerm
340 * which does not match the term that is in RaftActors log entry at prevLogIndex
341 * then the RaftActor does not change it's state and it returns a failure.
346 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
347 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
349 MockRaftActorContext context = createActorContext();
351 // First set the receivers term to lower number
352 context.getTermInformation().update(95, "test");
354 // AppendEntries is now sent with a bigger term
355 // this will set the receivers term to be the same as the sender's term
356 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
358 follower = createBehavior(context);
360 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
362 Assert.assertSame(follower, newBehavior);
364 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
365 AppendEntriesReply.class);
367 assertEquals("isSuccess", false, reply.isSuccess());
371 * This test verifies that when a new AppendEntries message is received with
372 * new entries and the logs of the sender and receiver match that the new
373 * entries get added to the log and the log is incremented by the number of
374 * entries received in appendEntries
379 public void testHandleAppendEntriesAddNewEntries() {
380 logStart("testHandleAppendEntriesAddNewEntries");
382 MockRaftActorContext context = createActorContext();
384 // First set the receivers term to lower number
385 context.getTermInformation().update(1, "test");
387 // Prepare the receivers log
388 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
389 log.append(newReplicatedLogEntry(1, 0, "zero"));
390 log.append(newReplicatedLogEntry(1, 1, "one"));
391 log.append(newReplicatedLogEntry(1, 2, "two"));
393 context.setReplicatedLog(log);
395 // Prepare the entries to be sent with AppendEntries
396 List<ReplicatedLogEntry> entries = new ArrayList<>();
397 entries.add(newReplicatedLogEntry(1, 3, "three"));
398 entries.add(newReplicatedLogEntry(1, 4, "four"));
400 // Send appendEntries with the same term as was set on the receiver
401 // before the new behavior was created (1 in this case)
402 // This will not work for a Candidate because as soon as a Candidate
403 // is created it increments the term
404 short leaderPayloadVersion = 10;
405 String leaderId = "leader-1";
406 AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
408 follower = createBehavior(context);
410 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
412 Assert.assertSame(follower, newBehavior);
414 assertEquals("Next index", 5, log.last().getIndex() + 1);
415 assertEquals("Entry 3", entries.get(0), log.get(3));
416 assertEquals("Entry 4", entries.get(1), log.get(4));
418 assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
419 assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
421 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
425 * This test verifies that when a new AppendEntries message is received with
426 * new entries and the logs of the sender and receiver are out-of-sync that
427 * the log is first corrected by removing the out of sync entries from the
428 * log and then adding in the new entries sent with the AppendEntries message
431 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
432 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
434 MockRaftActorContext context = createActorContext();
436 // First set the receivers term to lower number
437 context.getTermInformation().update(1, "test");
439 // Prepare the receivers log
440 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
441 log.append(newReplicatedLogEntry(1, 0, "zero"));
442 log.append(newReplicatedLogEntry(1, 1, "one"));
443 log.append(newReplicatedLogEntry(1, 2, "two"));
445 context.setReplicatedLog(log);
447 // Prepare the entries to be sent with AppendEntries
448 List<ReplicatedLogEntry> entries = new ArrayList<>();
449 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
450 entries.add(newReplicatedLogEntry(2, 3, "three"));
452 // Send appendEntries with the same term as was set on the receiver
453 // before the new behavior was created (1 in this case)
454 // This will not work for a Candidate because as soon as a Candidate
455 // is created it increments the term
456 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
458 follower = createBehavior(context);
460 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
462 Assert.assertSame(follower, newBehavior);
464 // The entry at index 2 will be found out-of-sync with the leader
465 // and will be removed
466 // Then the two new entries will be added to the log
467 // Thus making the log to have 4 entries
468 assertEquals("Next index", 4, log.last().getIndex() + 1);
469 //assertEquals("Entry 2", entries.get(0), log.get(2));
471 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
473 // Check that the entry at index 2 has the new data
474 assertEquals("Entry 2", entries.get(0), log.get(2));
476 assertEquals("Entry 3", entries.get(1), log.get(3));
478 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
482 public void testHandleAppendEntriesPreviousLogEntryMissing(){
483 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
485 MockRaftActorContext context = createActorContext();
487 // Prepare the receivers log
488 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
489 log.append(newReplicatedLogEntry(1, 0, "zero"));
490 log.append(newReplicatedLogEntry(1, 1, "one"));
491 log.append(newReplicatedLogEntry(1, 2, "two"));
493 context.setReplicatedLog(log);
495 // Prepare the entries to be sent with AppendEntries
496 List<ReplicatedLogEntry> entries = new ArrayList<>();
497 entries.add(newReplicatedLogEntry(1, 4, "four"));
499 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
501 follower = createBehavior(context);
503 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
505 Assert.assertSame(follower, newBehavior);
507 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
511 public void testHandleAppendEntriesWithExistingLogEntry() {
512 logStart("testHandleAppendEntriesWithExistingLogEntry");
514 MockRaftActorContext context = createActorContext();
516 context.getTermInformation().update(1, "test");
518 // Prepare the receivers log
519 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
520 log.append(newReplicatedLogEntry(1, 0, "zero"));
521 log.append(newReplicatedLogEntry(1, 1, "one"));
523 context.setReplicatedLog(log);
525 // Send the last entry again.
526 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
528 follower = createBehavior(context);
530 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
532 assertEquals("Next index", 2, log.last().getIndex() + 1);
533 assertEquals("Entry 1", entries.get(0), log.get(1));
535 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
537 // Send the last entry again and also a new one.
539 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
541 leaderActor.underlyingActor().clear();
542 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
544 assertEquals("Next index", 3, log.last().getIndex() + 1);
545 assertEquals("Entry 1", entries.get(0), log.get(1));
546 assertEquals("Entry 2", entries.get(1), log.get(2));
548 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
552 public void testHandleAppendEntriesAfterInstallingSnapshot(){
553 logStart("testHandleAppendAfterInstallingSnapshot");
555 MockRaftActorContext context = createActorContext();
557 // Prepare the receivers log
558 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
560 // Set up a log as if it has been snapshotted
561 log.setSnapshotIndex(3);
562 log.setSnapshotTerm(1);
564 context.setReplicatedLog(log);
566 // Prepare the entries to be sent with AppendEntries
567 List<ReplicatedLogEntry> entries = new ArrayList<>();
568 entries.add(newReplicatedLogEntry(1, 4, "four"));
570 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
572 follower = createBehavior(context);
574 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
576 Assert.assertSame(follower, newBehavior);
578 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
583 * This test verifies that when InstallSnapshot is received by
584 * the follower its applied correctly.
589 public void testHandleInstallSnapshot() throws Exception {
590 logStart("testHandleInstallSnapshot");
592 MockRaftActorContext context = createActorContext();
594 follower = createBehavior(context);
596 ByteString bsSnapshot = createSnapshot();
598 int snapshotLength = bsSnapshot.size();
600 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
601 int lastIncludedIndex = 1;
603 InstallSnapshot lastInstallSnapshot = null;
605 for(int i = 0; i < totalChunks; i++) {
606 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
607 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
608 chunkData, chunkIndex, totalChunks);
609 follower.handleMessage(leaderActor, lastInstallSnapshot);
610 offset = offset + 50;
615 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
616 ApplySnapshot.class);
617 Snapshot snapshot = applySnapshot.getSnapshot();
618 assertNotNull(lastInstallSnapshot);
619 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
620 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
621 snapshot.getLastAppliedTerm());
622 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
623 snapshot.getLastAppliedIndex());
624 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
625 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
627 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
628 leaderActor, InstallSnapshotReply.class);
629 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
632 for(InstallSnapshotReply reply: replies) {
633 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
634 assertEquals("getTerm", 1, reply.getTerm());
635 assertEquals("isSuccess", true, reply.isSuccess());
636 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
639 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
644 * Verify that when an AppendEntries is sent to a follower during a snapshot install
645 * the Follower short-circuits the processing of the AppendEntries message.
650 public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
651 logStart("testReceivingAppendEntriesDuringInstallSnapshot");
653 MockRaftActorContext context = createActorContext();
655 follower = createBehavior(context);
657 ByteString bsSnapshot = createSnapshot();
658 int snapshotLength = bsSnapshot.size();
660 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
661 int lastIncludedIndex = 1;
663 // Check that snapshot installation is not in progress
664 assertNull(((Follower) follower).getSnapshotTracker());
666 // Make sure that we have more than 1 chunk to send
667 assertTrue(totalChunks > 1);
669 // Send an install snapshot with the first chunk to start the process of installing a snapshot
670 ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
671 follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
672 chunkData, 1, totalChunks));
674 // Check if snapshot installation is in progress now
675 assertNotNull(((Follower) follower).getSnapshotTracker());
677 // Send an append entry
678 AppendEntries appendEntries = mock(AppendEntries.class);
679 doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
681 follower.handleMessage(leaderActor, appendEntries);
683 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
684 assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
685 assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
686 assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
688 // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
689 verify(appendEntries, never()).getPrevLogIndex();
694 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
695 logStart("testInitialSyncUpWithHandleInstallSnapshot");
697 MockRaftActorContext context = createActorContext();
699 follower = createBehavior(context);
701 ByteString bsSnapshot = createSnapshot();
703 int snapshotLength = bsSnapshot.size();
705 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
706 int lastIncludedIndex = 1;
708 InstallSnapshot lastInstallSnapshot = null;
710 for(int i = 0; i < totalChunks; i++) {
711 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
712 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
713 chunkData, chunkIndex, totalChunks);
714 follower.handleMessage(leaderActor, lastInstallSnapshot);
715 offset = offset + 50;
720 FollowerInitialSyncUpStatus syncStatus =
721 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
723 assertFalse(syncStatus.isInitialSyncDone());
725 // Clear all the messages
726 followerActor.underlyingActor().clear();
728 context.setLastApplied(101);
729 context.setCommitIndex(101);
730 setLastLogEntry(context, 1, 101,
731 new MockRaftActorContext.MockPayload(""));
733 List<ReplicatedLogEntry> entries = Arrays.asList(
734 newReplicatedLogEntry(2, 101, "foo"));
736 // The new commitIndex is 101
737 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
738 follower.handleMessage(leaderActor, appendEntries);
740 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
742 assertTrue(syncStatus.isInitialSyncDone());
746 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
747 logStart("testHandleOutOfSequenceInstallSnapshot");
749 MockRaftActorContext context = createActorContext();
751 follower = createBehavior(context);
753 ByteString bsSnapshot = createSnapshot();
755 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
756 getNextChunk(bsSnapshot, 10, 50), 3, 3);
757 follower.handleMessage(leaderActor, installSnapshot);
759 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
760 InstallSnapshotReply.class);
762 assertEquals("isSuccess", false, reply.isSuccess());
763 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
764 assertEquals("getTerm", 1, reply.getTerm());
765 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
767 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
771 public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
772 MockRaftActorContext context = createActorContext();
774 Stopwatch stopwatch = Stopwatch.createStarted();
776 follower = createBehavior(context);
778 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
780 long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
782 assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
785 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
786 int snapshotLength = bs.size();
788 int size = chunkSize;
789 if (chunkSize > snapshotLength) {
790 size = snapshotLength;
792 if ((start + chunkSize) > snapshotLength) {
793 size = snapshotLength - start;
796 return bs.substring(start, start + size);
799 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
800 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
802 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
803 AppendEntriesReply.class);
805 assertEquals("isSuccess", expSuccess, reply.isSuccess());
806 assertEquals("getTerm", expTerm, reply.getTerm());
807 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
808 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
809 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
810 assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
813 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
814 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
815 new MockRaftActorContext.MockPayload(data));
818 private ByteString createSnapshot(){
819 HashMap<String, String> followerSnapshot = new HashMap<>();
820 followerSnapshot.put("1", "A");
821 followerSnapshot.put("2", "B");
822 followerSnapshot.put("3", "C");
824 return toByteString(followerSnapshot);
828 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
829 ActorRef actorRef, RaftRPC rpc) throws Exception {
830 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
832 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
833 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
837 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
839 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
840 assertEquals("isSuccess", true, reply.isSuccess());