1 package org.opendaylight.controller.cluster.raft.behaviors;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.testkit.TestActorRef;
10 import com.google.protobuf.ByteString;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.HashMap;
14 import java.util.List;
15 import org.junit.After;
16 import org.junit.Assert;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.Snapshot;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
24 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
36 private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
37 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
39 private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
40 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
42 private RaftActorBehavior follower;
46 public void tearDown() throws Exception {
47 if(follower != null) {
55 protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
56 return new Follower(actorContext);
60 protected MockRaftActorContext createActorContext() {
61 return createActorContext(followerActor);
65 protected MockRaftActorContext createActorContext(ActorRef actorRef){
66 return new MockRaftActorContext("follower", getSystem(), actorRef);
70 public void testThatAnElectionTimeoutIsTriggered(){
71 MockRaftActorContext actorContext = createActorContext();
72 follower = new Follower(actorContext);
74 MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
75 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
79 public void testHandleElectionTimeout(){
80 logStart("testHandleElectionTimeout");
82 follower = new Follower(createActorContext());
84 RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
86 assertTrue(raftBehavior instanceof Candidate);
90 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
91 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
93 RaftActorContext context = createActorContext();
95 context.getTermInformation().update(term, null);
97 follower = createBehavior(context);
99 follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
101 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
103 assertEquals("isVoteGranted", true, reply.isVoteGranted());
104 assertEquals("getTerm", term, reply.getTerm());
108 public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
109 logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
111 RaftActorContext context = createActorContext();
113 context.getTermInformation().update(term, "test");
115 follower = createBehavior(context);
117 follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
119 RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
121 assertEquals("isVoteGranted", false, reply.isVoteGranted());
126 public void testHandleFirstAppendEntries() throws Exception {
127 logStart("testHandleFirstAppendEntries");
129 MockRaftActorContext context = createActorContext();
131 List<ReplicatedLogEntry> entries = Arrays.asList(
132 newReplicatedLogEntry(2, 101, "foo"));
134 // The new commitIndex is 101
135 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
137 follower = createBehavior(context);
138 follower.handleMessage(leaderActor, appendEntries);
140 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
142 assertFalse(syncStatus.isInitialSyncDone());
146 public void testHandleSyncUpAppendEntries() throws Exception {
147 logStart("testHandleSyncUpAppendEntries");
149 MockRaftActorContext context = createActorContext();
151 List<ReplicatedLogEntry> entries = Arrays.asList(
152 newReplicatedLogEntry(2, 101, "foo"));
154 // The new commitIndex is 101
155 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
157 follower = createBehavior(context);
158 follower.handleMessage(leaderActor, appendEntries);
160 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
162 assertFalse(syncStatus.isInitialSyncDone());
164 // Clear all the messages
165 followerActor.underlyingActor().clear();
167 context.setLastApplied(101);
168 context.setCommitIndex(101);
169 setLastLogEntry(context, 1, 101,
170 new MockRaftActorContext.MockPayload(""));
172 entries = Arrays.asList(
173 newReplicatedLogEntry(2, 101, "foo"));
175 // The new commitIndex is 101
176 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
177 follower.handleMessage(leaderActor, appendEntries);
179 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
181 assertTrue(syncStatus.isInitialSyncDone());
183 followerActor.underlyingActor().clear();
185 // Sending the same message again should not generate another message
187 follower.handleMessage(leaderActor, appendEntries);
189 syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
191 assertNull(syncStatus);
196 public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
197 logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
199 MockRaftActorContext context = createActorContext();
201 List<ReplicatedLogEntry> entries = Arrays.asList(
202 newReplicatedLogEntry(2, 101, "foo"));
204 // The new commitIndex is 101
205 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
207 follower = createBehavior(context);
208 follower.handleMessage(leaderActor, appendEntries);
210 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
212 assertFalse(syncStatus.isInitialSyncDone());
214 // Clear all the messages
215 followerActor.underlyingActor().clear();
217 context.setLastApplied(100);
218 setLastLogEntry(context, 1, 100,
219 new MockRaftActorContext.MockPayload(""));
221 entries = Arrays.asList(
222 newReplicatedLogEntry(2, 101, "foo"));
224 // leader-2 is becoming the leader now and it says the commitIndex is 45
225 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
226 follower.handleMessage(leaderActor, appendEntries);
228 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
230 // We get a new message saying initial status is not done
231 assertFalse(syncStatus.isInitialSyncDone());
237 public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
238 logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
240 MockRaftActorContext context = createActorContext();
242 List<ReplicatedLogEntry> entries = Arrays.asList(
243 newReplicatedLogEntry(2, 101, "foo"));
245 // The new commitIndex is 101
246 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
248 follower = createBehavior(context);
249 follower.handleMessage(leaderActor, appendEntries);
251 FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
253 assertFalse(syncStatus.isInitialSyncDone());
255 // Clear all the messages
256 followerActor.underlyingActor().clear();
258 context.setLastApplied(101);
259 context.setCommitIndex(101);
260 setLastLogEntry(context, 1, 101,
261 new MockRaftActorContext.MockPayload(""));
263 entries = Arrays.asList(
264 newReplicatedLogEntry(2, 101, "foo"));
266 // The new commitIndex is 101
267 appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
268 follower.handleMessage(leaderActor, appendEntries);
270 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
272 assertTrue(syncStatus.isInitialSyncDone());
274 // Clear all the messages
275 followerActor.underlyingActor().clear();
277 context.setLastApplied(100);
278 setLastLogEntry(context, 1, 100,
279 new MockRaftActorContext.MockPayload(""));
281 entries = Arrays.asList(
282 newReplicatedLogEntry(2, 101, "foo"));
284 // leader-2 is becoming the leader now and it says the commitIndex is 45
285 appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
286 follower.handleMessage(leaderActor, appendEntries);
288 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
290 // We get a new message saying initial status is not done
291 assertFalse(syncStatus.isInitialSyncDone());
297 * This test verifies that when an AppendEntries RPC is received by a RaftActor
298 * with a commitIndex that is greater than what has been applied to the
299 * state machine of the RaftActor, the RaftActor applies the state and
300 * sets it current applied state to the commitIndex of the sender.
305 public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
306 logStart("testHandleAppendEntriesWithNewerCommitIndex");
308 MockRaftActorContext context = createActorContext();
310 context.setLastApplied(100);
311 setLastLogEntry(context, 1, 100,
312 new MockRaftActorContext.MockPayload(""));
313 context.getReplicatedLog().setSnapshotIndex(99);
315 List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
316 newReplicatedLogEntry(2, 101, "foo"));
318 // The new commitIndex is 101
319 AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
321 follower = createBehavior(context);
322 follower.handleMessage(leaderActor, appendEntries);
324 assertEquals("getLastApplied", 101L, context.getLastApplied());
328 * This test verifies that when an AppendEntries is received a specific prevLogTerm
329 * which does not match the term that is in RaftActors log entry at prevLogIndex
330 * then the RaftActor does not change it's state and it returns a failure.
335 public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
336 logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
338 MockRaftActorContext context = createActorContext();
340 // First set the receivers term to lower number
341 context.getTermInformation().update(95, "test");
343 // AppendEntries is now sent with a bigger term
344 // this will set the receivers term to be the same as the sender's term
345 AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
347 follower = createBehavior(context);
349 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
351 Assert.assertSame(follower, newBehavior);
353 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
354 AppendEntriesReply.class);
356 assertEquals("isSuccess", false, reply.isSuccess());
360 * This test verifies that when a new AppendEntries message is received with
361 * new entries and the logs of the sender and receiver match that the new
362 * entries get added to the log and the log is incremented by the number of
363 * entries received in appendEntries
368 public void testHandleAppendEntriesAddNewEntries() {
369 logStart("testHandleAppendEntriesAddNewEntries");
371 MockRaftActorContext context = createActorContext();
373 // First set the receivers term to lower number
374 context.getTermInformation().update(1, "test");
376 // Prepare the receivers log
377 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
378 log.append(newReplicatedLogEntry(1, 0, "zero"));
379 log.append(newReplicatedLogEntry(1, 1, "one"));
380 log.append(newReplicatedLogEntry(1, 2, "two"));
382 context.setReplicatedLog(log);
384 // Prepare the entries to be sent with AppendEntries
385 List<ReplicatedLogEntry> entries = new ArrayList<>();
386 entries.add(newReplicatedLogEntry(1, 3, "three"));
387 entries.add(newReplicatedLogEntry(1, 4, "four"));
389 // Send appendEntries with the same term as was set on the receiver
390 // before the new behavior was created (1 in this case)
391 // This will not work for a Candidate because as soon as a Candidate
392 // is created it increments the term
393 AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
395 follower = createBehavior(context);
397 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
399 Assert.assertSame(follower, newBehavior);
401 assertEquals("Next index", 5, log.last().getIndex() + 1);
402 assertEquals("Entry 3", entries.get(0), log.get(3));
403 assertEquals("Entry 4", entries.get(1), log.get(4));
405 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
409 * This test verifies that when a new AppendEntries message is received with
410 * new entries and the logs of the sender and receiver are out-of-sync that
411 * the log is first corrected by removing the out of sync entries from the
412 * log and then adding in the new entries sent with the AppendEntries message
415 public void testHandleAppendEntriesCorrectReceiverLogEntries() {
416 logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
418 MockRaftActorContext context = createActorContext();
420 // First set the receivers term to lower number
421 context.getTermInformation().update(1, "test");
423 // Prepare the receivers log
424 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
425 log.append(newReplicatedLogEntry(1, 0, "zero"));
426 log.append(newReplicatedLogEntry(1, 1, "one"));
427 log.append(newReplicatedLogEntry(1, 2, "two"));
429 context.setReplicatedLog(log);
431 // Prepare the entries to be sent with AppendEntries
432 List<ReplicatedLogEntry> entries = new ArrayList<>();
433 entries.add(newReplicatedLogEntry(2, 2, "two-1"));
434 entries.add(newReplicatedLogEntry(2, 3, "three"));
436 // Send appendEntries with the same term as was set on the receiver
437 // before the new behavior was created (1 in this case)
438 // This will not work for a Candidate because as soon as a Candidate
439 // is created it increments the term
440 AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
442 follower = createBehavior(context);
444 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
446 Assert.assertSame(follower, newBehavior);
448 // The entry at index 2 will be found out-of-sync with the leader
449 // and will be removed
450 // Then the two new entries will be added to the log
451 // Thus making the log to have 4 entries
452 assertEquals("Next index", 4, log.last().getIndex() + 1);
453 //assertEquals("Entry 2", entries.get(0), log.get(2));
455 assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
457 // Check that the entry at index 2 has the new data
458 assertEquals("Entry 2", entries.get(0), log.get(2));
460 assertEquals("Entry 3", entries.get(1), log.get(3));
462 expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
466 public void testHandleAppendEntriesPreviousLogEntryMissing(){
467 logStart("testHandleAppendEntriesPreviousLogEntryMissing");
469 MockRaftActorContext context = createActorContext();
471 // Prepare the receivers log
472 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
473 log.append(newReplicatedLogEntry(1, 0, "zero"));
474 log.append(newReplicatedLogEntry(1, 1, "one"));
475 log.append(newReplicatedLogEntry(1, 2, "two"));
477 context.setReplicatedLog(log);
479 // Prepare the entries to be sent with AppendEntries
480 List<ReplicatedLogEntry> entries = new ArrayList<>();
481 entries.add(newReplicatedLogEntry(1, 4, "four"));
483 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
485 follower = createBehavior(context);
487 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
489 Assert.assertSame(follower, newBehavior);
491 expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
495 public void testHandleAppendEntriesWithExistingLogEntry() {
496 logStart("testHandleAppendEntriesWithExistingLogEntry");
498 MockRaftActorContext context = createActorContext();
500 context.getTermInformation().update(1, "test");
502 // Prepare the receivers log
503 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
504 log.append(newReplicatedLogEntry(1, 0, "zero"));
505 log.append(newReplicatedLogEntry(1, 1, "one"));
507 context.setReplicatedLog(log);
509 // Send the last entry again.
510 List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
512 follower = createBehavior(context);
514 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
516 assertEquals("Next index", 2, log.last().getIndex() + 1);
517 assertEquals("Entry 1", entries.get(0), log.get(1));
519 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
521 // Send the last entry again and also a new one.
523 entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
525 leaderActor.underlyingActor().clear();
526 follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
528 assertEquals("Next index", 3, log.last().getIndex() + 1);
529 assertEquals("Entry 1", entries.get(0), log.get(1));
530 assertEquals("Entry 2", entries.get(1), log.get(2));
532 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
536 public void testHandleAppendEntriesAfterInstallingSnapshot(){
537 logStart("testHandleAppendAfterInstallingSnapshot");
539 MockRaftActorContext context = createActorContext();
541 // Prepare the receivers log
542 MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
544 // Set up a log as if it has been snapshotted
545 log.setSnapshotIndex(3);
546 log.setSnapshotTerm(1);
548 context.setReplicatedLog(log);
550 // Prepare the entries to be sent with AppendEntries
551 List<ReplicatedLogEntry> entries = new ArrayList<>();
552 entries.add(newReplicatedLogEntry(1, 4, "four"));
554 AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
556 follower = createBehavior(context);
558 RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
560 Assert.assertSame(follower, newBehavior);
562 expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
567 * This test verifies that when InstallSnapshot is received by
568 * the follower its applied correctly.
573 public void testHandleInstallSnapshot() throws Exception {
574 logStart("testHandleInstallSnapshot");
576 MockRaftActorContext context = createActorContext();
578 follower = createBehavior(context);
580 HashMap<String, String> followerSnapshot = new HashMap<>();
581 followerSnapshot.put("1", "A");
582 followerSnapshot.put("2", "B");
583 followerSnapshot.put("3", "C");
585 ByteString bsSnapshot = toByteString(followerSnapshot);
587 int snapshotLength = bsSnapshot.size();
589 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
590 int lastIncludedIndex = 1;
592 InstallSnapshot lastInstallSnapshot = null;
594 for(int i = 0; i < totalChunks; i++) {
595 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
596 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
597 chunkData, chunkIndex, totalChunks);
598 follower.handleMessage(leaderActor, lastInstallSnapshot);
599 offset = offset + 50;
604 ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
605 ApplySnapshot.class);
606 Snapshot snapshot = applySnapshot.getSnapshot();
607 assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
608 assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
609 snapshot.getLastAppliedTerm());
610 assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
611 snapshot.getLastAppliedIndex());
612 assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
613 Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
615 List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
616 leaderActor, InstallSnapshotReply.class);
617 assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
620 for(InstallSnapshotReply reply: replies) {
621 assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
622 assertEquals("getTerm", 1, reply.getTerm());
623 assertEquals("isSuccess", true, reply.isSuccess());
624 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
627 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
631 public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
632 logStart("testInitialSyncUpWithHandleInstallSnapshot");
634 MockRaftActorContext context = createActorContext();
636 follower = createBehavior(context);
638 HashMap<String, String> followerSnapshot = new HashMap<>();
639 followerSnapshot.put("1", "A");
640 followerSnapshot.put("2", "B");
641 followerSnapshot.put("3", "C");
643 ByteString bsSnapshot = toByteString(followerSnapshot);
645 int snapshotLength = bsSnapshot.size();
647 int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
648 int lastIncludedIndex = 1;
650 InstallSnapshot lastInstallSnapshot = null;
652 for(int i = 0; i < totalChunks; i++) {
653 ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
654 lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
655 chunkData, chunkIndex, totalChunks);
656 follower.handleMessage(leaderActor, lastInstallSnapshot);
657 offset = offset + 50;
662 FollowerInitialSyncUpStatus syncStatus =
663 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
665 assertFalse(syncStatus.isInitialSyncDone());
667 // Clear all the messages
668 followerActor.underlyingActor().clear();
670 context.setLastApplied(101);
671 context.setCommitIndex(101);
672 setLastLogEntry(context, 1, 101,
673 new MockRaftActorContext.MockPayload(""));
675 List<ReplicatedLogEntry> entries = Arrays.asList(
676 newReplicatedLogEntry(2, 101, "foo"));
678 // The new commitIndex is 101
679 AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
680 follower.handleMessage(leaderActor, appendEntries);
682 syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
684 assertTrue(syncStatus.isInitialSyncDone());
688 public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
689 logStart("testHandleOutOfSequenceInstallSnapshot");
691 MockRaftActorContext context = createActorContext();
693 follower = createBehavior(context);
695 HashMap<String, String> followerSnapshot = new HashMap<>();
696 followerSnapshot.put("1", "A");
697 followerSnapshot.put("2", "B");
698 followerSnapshot.put("3", "C");
700 ByteString bsSnapshot = toByteString(followerSnapshot);
702 InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
703 getNextChunk(bsSnapshot, 10, 50), 3, 3);
704 follower.handleMessage(leaderActor, installSnapshot);
706 InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
707 InstallSnapshotReply.class);
709 assertEquals("isSuccess", false, reply.isSuccess());
710 assertEquals("getChunkIndex", -1, reply.getChunkIndex());
711 assertEquals("getTerm", 1, reply.getTerm());
712 assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
714 assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
717 public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
718 int snapshotLength = bs.size();
720 int size = chunkSize;
721 if (chunkSize > snapshotLength) {
722 size = snapshotLength;
724 if ((start + chunkSize) > snapshotLength) {
725 size = snapshotLength - start;
728 return bs.substring(start, start + size);
731 private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
732 String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
734 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
735 AppendEntriesReply.class);
737 assertEquals("isSuccess", expSuccess, reply.isSuccess());
738 assertEquals("getTerm", expTerm, reply.getTerm());
739 assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
740 assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
741 assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
744 private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
745 return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
746 new MockRaftActorContext.MockPayload(data));
750 protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
751 ActorRef actorRef, RaftRPC rpc) throws Exception {
752 super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
754 String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
755 assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
759 protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
761 AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
762 assertEquals("isSuccess", true, reply.isSuccess());