Do not use ActorSystem.actorFor as it is deprecated
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.testkit.TestActorRef;
10 import com.google.protobuf.ByteString;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.HashMap;
14 import java.util.List;
15 import org.junit.After;
16 import org.junit.Assert;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.Snapshot;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
24 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33
34 public class FollowerTest extends AbstractRaftActorBehaviorTest {
35
36     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
37             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
38
39     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
40             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
41
42     private RaftActorBehavior follower;
43
44     @Override
45     @After
46     public void tearDown() throws Exception {
47         if(follower != null) {
48             follower.close();
49         }
50
51         super.tearDown();
52     }
53
54     @Override
55     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
56         return new Follower(actorContext);
57     }
58
59     @Override
60     protected  MockRaftActorContext createActorContext() {
61         return createActorContext(followerActor);
62     }
63
64     @Override
65     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
66         return new MockRaftActorContext("follower", getSystem(), actorRef);
67     }
68
69     @Test
70     public void testThatAnElectionTimeoutIsTriggered(){
71         MockRaftActorContext actorContext = createActorContext();
72         follower = new Follower(actorContext);
73
74         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
75                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
76     }
77
78     @Test
79     public void testHandleElectionTimeout(){
80         logStart("testHandleElectionTimeout");
81
82         follower = new Follower(createActorContext());
83
84         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
85
86         assertTrue(raftBehavior instanceof Candidate);
87     }
88
89     @Test
90     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
91         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
92
93         RaftActorContext context = createActorContext();
94         long term = 1000;
95         context.getTermInformation().update(term, null);
96
97         follower = createBehavior(context);
98
99         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
100
101         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
102
103         assertEquals("isVoteGranted", true, reply.isVoteGranted());
104         assertEquals("getTerm", term, reply.getTerm());
105     }
106
107     @Test
108     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
109         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
110
111         RaftActorContext context = createActorContext();
112         long term = 1000;
113         context.getTermInformation().update(term, "test");
114
115         follower = createBehavior(context);
116
117         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
118
119         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
120
121         assertEquals("isVoteGranted", false, reply.isVoteGranted());
122     }
123
124
125     @Test
126     public void testHandleFirstAppendEntries() throws Exception {
127         logStart("testHandleFirstAppendEntries");
128
129         MockRaftActorContext context = createActorContext();
130
131         List<ReplicatedLogEntry> entries = Arrays.asList(
132                 newReplicatedLogEntry(2, 101, "foo"));
133
134         // The new commitIndex is 101
135         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
136
137         follower = createBehavior(context);
138         follower.handleMessage(leaderActor, appendEntries);
139
140         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
141
142         assertFalse(syncStatus.isInitialSyncDone());
143     }
144
145     @Test
146     public void testHandleSyncUpAppendEntries() throws Exception {
147         logStart("testHandleSyncUpAppendEntries");
148
149         MockRaftActorContext context = createActorContext();
150
151         List<ReplicatedLogEntry> entries = Arrays.asList(
152                 newReplicatedLogEntry(2, 101, "foo"));
153
154         // The new commitIndex is 101
155         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
156
157         follower = createBehavior(context);
158         follower.handleMessage(leaderActor, appendEntries);
159
160         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
161
162         assertFalse(syncStatus.isInitialSyncDone());
163
164         // Clear all the messages
165         followerActor.underlyingActor().clear();
166
167         context.setLastApplied(101);
168         context.setCommitIndex(101);
169         setLastLogEntry(context, 1, 101,
170                 new MockRaftActorContext.MockPayload(""));
171
172         entries = Arrays.asList(
173                 newReplicatedLogEntry(2, 101, "foo"));
174
175         // The new commitIndex is 101
176         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
177         follower.handleMessage(leaderActor, appendEntries);
178
179         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
180
181         assertTrue(syncStatus.isInitialSyncDone());
182
183         followerActor.underlyingActor().clear();
184
185         // Sending the same message again should not generate another message
186
187         follower.handleMessage(leaderActor, appendEntries);
188
189         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
190
191         assertNull(syncStatus);
192
193     }
194
195     @Test
196     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
197         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
198
199         MockRaftActorContext context = createActorContext();
200
201         List<ReplicatedLogEntry> entries = Arrays.asList(
202                 newReplicatedLogEntry(2, 101, "foo"));
203
204         // The new commitIndex is 101
205         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
206
207         follower = createBehavior(context);
208         follower.handleMessage(leaderActor, appendEntries);
209
210         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
211
212         assertFalse(syncStatus.isInitialSyncDone());
213
214         // Clear all the messages
215         followerActor.underlyingActor().clear();
216
217         context.setLastApplied(100);
218         setLastLogEntry(context, 1, 100,
219                 new MockRaftActorContext.MockPayload(""));
220
221         entries = Arrays.asList(
222                 newReplicatedLogEntry(2, 101, "foo"));
223
224         // leader-2 is becoming the leader now and it says the commitIndex is 45
225         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
226         follower.handleMessage(leaderActor, appendEntries);
227
228         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
229
230         // We get a new message saying initial status is not done
231         assertFalse(syncStatus.isInitialSyncDone());
232
233     }
234
235
236     @Test
237     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
238         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
239
240         MockRaftActorContext context = createActorContext();
241
242         List<ReplicatedLogEntry> entries = Arrays.asList(
243                 newReplicatedLogEntry(2, 101, "foo"));
244
245         // The new commitIndex is 101
246         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
247
248         follower = createBehavior(context);
249         follower.handleMessage(leaderActor, appendEntries);
250
251         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
252
253         assertFalse(syncStatus.isInitialSyncDone());
254
255         // Clear all the messages
256         followerActor.underlyingActor().clear();
257
258         context.setLastApplied(101);
259         context.setCommitIndex(101);
260         setLastLogEntry(context, 1, 101,
261                 new MockRaftActorContext.MockPayload(""));
262
263         entries = Arrays.asList(
264                 newReplicatedLogEntry(2, 101, "foo"));
265
266         // The new commitIndex is 101
267         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
268         follower.handleMessage(leaderActor, appendEntries);
269
270         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
271
272         assertTrue(syncStatus.isInitialSyncDone());
273
274         // Clear all the messages
275         followerActor.underlyingActor().clear();
276
277         context.setLastApplied(100);
278         setLastLogEntry(context, 1, 100,
279                 new MockRaftActorContext.MockPayload(""));
280
281         entries = Arrays.asList(
282                 newReplicatedLogEntry(2, 101, "foo"));
283
284         // leader-2 is becoming the leader now and it says the commitIndex is 45
285         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
286         follower.handleMessage(leaderActor, appendEntries);
287
288         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
289
290         // We get a new message saying initial status is not done
291         assertFalse(syncStatus.isInitialSyncDone());
292
293     }
294
295
296     /**
297      * This test verifies that when an AppendEntries RPC is received by a RaftActor
298      * with a commitIndex that is greater than what has been applied to the
299      * state machine of the RaftActor, the RaftActor applies the state and
300      * sets it current applied state to the commitIndex of the sender.
301      *
302      * @throws Exception
303      */
304     @Test
305     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
306         logStart("testHandleAppendEntriesWithNewerCommitIndex");
307
308         MockRaftActorContext context = createActorContext();
309
310         context.setLastApplied(100);
311         setLastLogEntry(context, 1, 100,
312                 new MockRaftActorContext.MockPayload(""));
313         context.getReplicatedLog().setSnapshotIndex(99);
314
315         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
316                 newReplicatedLogEntry(2, 101, "foo"));
317
318         // The new commitIndex is 101
319         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
320
321         follower = createBehavior(context);
322         follower.handleMessage(leaderActor, appendEntries);
323
324         assertEquals("getLastApplied", 101L, context.getLastApplied());
325     }
326
327     /**
328      * This test verifies that when an AppendEntries is received a specific prevLogTerm
329      * which does not match the term that is in RaftActors log entry at prevLogIndex
330      * then the RaftActor does not change it's state and it returns a failure.
331      *
332      * @throws Exception
333      */
334     @Test
335     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
336         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
337
338         MockRaftActorContext context = createActorContext();
339
340         // First set the receivers term to lower number
341         context.getTermInformation().update(95, "test");
342
343         // AppendEntries is now sent with a bigger term
344         // this will set the receivers term to be the same as the sender's term
345         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
346
347         follower = createBehavior(context);
348
349         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
350
351         Assert.assertSame(follower, newBehavior);
352
353         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
354                 AppendEntriesReply.class);
355
356         assertEquals("isSuccess", false, reply.isSuccess());
357     }
358
359     /**
360      * This test verifies that when a new AppendEntries message is received with
361      * new entries and the logs of the sender and receiver match that the new
362      * entries get added to the log and the log is incremented by the number of
363      * entries received in appendEntries
364      *
365      * @throws Exception
366      */
367     @Test
368     public void testHandleAppendEntriesAddNewEntries() {
369         logStart("testHandleAppendEntriesAddNewEntries");
370
371         MockRaftActorContext context = createActorContext();
372
373         // First set the receivers term to lower number
374         context.getTermInformation().update(1, "test");
375
376         // Prepare the receivers log
377         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
378         log.append(newReplicatedLogEntry(1, 0, "zero"));
379         log.append(newReplicatedLogEntry(1, 1, "one"));
380         log.append(newReplicatedLogEntry(1, 2, "two"));
381
382         context.setReplicatedLog(log);
383
384         // Prepare the entries to be sent with AppendEntries
385         List<ReplicatedLogEntry> entries = new ArrayList<>();
386         entries.add(newReplicatedLogEntry(1, 3, "three"));
387         entries.add(newReplicatedLogEntry(1, 4, "four"));
388
389         // Send appendEntries with the same term as was set on the receiver
390         // before the new behavior was created (1 in this case)
391         // This will not work for a Candidate because as soon as a Candidate
392         // is created it increments the term
393         AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
394
395         follower = createBehavior(context);
396
397         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
398
399         Assert.assertSame(follower, newBehavior);
400
401         assertEquals("Next index", 5, log.last().getIndex() + 1);
402         assertEquals("Entry 3", entries.get(0), log.get(3));
403         assertEquals("Entry 4", entries.get(1), log.get(4));
404
405         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
406     }
407
408     /**
409      * This test verifies that when a new AppendEntries message is received with
410      * new entries and the logs of the sender and receiver are out-of-sync that
411      * the log is first corrected by removing the out of sync entries from the
412      * log and then adding in the new entries sent with the AppendEntries message
413      */
414     @Test
415     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
416         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
417
418         MockRaftActorContext context = createActorContext();
419
420         // First set the receivers term to lower number
421         context.getTermInformation().update(1, "test");
422
423         // Prepare the receivers log
424         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
425         log.append(newReplicatedLogEntry(1, 0, "zero"));
426         log.append(newReplicatedLogEntry(1, 1, "one"));
427         log.append(newReplicatedLogEntry(1, 2, "two"));
428
429         context.setReplicatedLog(log);
430
431         // Prepare the entries to be sent with AppendEntries
432         List<ReplicatedLogEntry> entries = new ArrayList<>();
433         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
434         entries.add(newReplicatedLogEntry(2, 3, "three"));
435
436         // Send appendEntries with the same term as was set on the receiver
437         // before the new behavior was created (1 in this case)
438         // This will not work for a Candidate because as soon as a Candidate
439         // is created it increments the term
440         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
441
442         follower = createBehavior(context);
443
444         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
445
446         Assert.assertSame(follower, newBehavior);
447
448         // The entry at index 2 will be found out-of-sync with the leader
449         // and will be removed
450         // Then the two new entries will be added to the log
451         // Thus making the log to have 4 entries
452         assertEquals("Next index", 4, log.last().getIndex() + 1);
453         //assertEquals("Entry 2", entries.get(0), log.get(2));
454
455         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
456
457         // Check that the entry at index 2 has the new data
458         assertEquals("Entry 2", entries.get(0), log.get(2));
459
460         assertEquals("Entry 3", entries.get(1), log.get(3));
461
462         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
463     }
464
465     @Test
466     public void testHandleAppendEntriesPreviousLogEntryMissing(){
467         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
468
469         MockRaftActorContext context = createActorContext();
470
471         // Prepare the receivers log
472         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
473         log.append(newReplicatedLogEntry(1, 0, "zero"));
474         log.append(newReplicatedLogEntry(1, 1, "one"));
475         log.append(newReplicatedLogEntry(1, 2, "two"));
476
477         context.setReplicatedLog(log);
478
479         // Prepare the entries to be sent with AppendEntries
480         List<ReplicatedLogEntry> entries = new ArrayList<>();
481         entries.add(newReplicatedLogEntry(1, 4, "four"));
482
483         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
484
485         follower = createBehavior(context);
486
487         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
488
489         Assert.assertSame(follower, newBehavior);
490
491         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
492     }
493
494     @Test
495     public void testHandleAppendEntriesWithExistingLogEntry() {
496         logStart("testHandleAppendEntriesWithExistingLogEntry");
497
498         MockRaftActorContext context = createActorContext();
499
500         context.getTermInformation().update(1, "test");
501
502         // Prepare the receivers log
503         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
504         log.append(newReplicatedLogEntry(1, 0, "zero"));
505         log.append(newReplicatedLogEntry(1, 1, "one"));
506
507         context.setReplicatedLog(log);
508
509         // Send the last entry again.
510         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
511
512         follower = createBehavior(context);
513
514         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
515
516         assertEquals("Next index", 2, log.last().getIndex() + 1);
517         assertEquals("Entry 1", entries.get(0), log.get(1));
518
519         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
520
521         // Send the last entry again and also a new one.
522
523         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
524
525         leaderActor.underlyingActor().clear();
526         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
527
528         assertEquals("Next index", 3, log.last().getIndex() + 1);
529         assertEquals("Entry 1", entries.get(0), log.get(1));
530         assertEquals("Entry 2", entries.get(1), log.get(2));
531
532         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
533     }
534
535     @Test
536     public void testHandleAppendEntriesAfterInstallingSnapshot(){
537         logStart("testHandleAppendAfterInstallingSnapshot");
538
539         MockRaftActorContext context = createActorContext();
540
541         // Prepare the receivers log
542         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
543
544         // Set up a log as if it has been snapshotted
545         log.setSnapshotIndex(3);
546         log.setSnapshotTerm(1);
547
548         context.setReplicatedLog(log);
549
550         // Prepare the entries to be sent with AppendEntries
551         List<ReplicatedLogEntry> entries = new ArrayList<>();
552         entries.add(newReplicatedLogEntry(1, 4, "four"));
553
554         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
555
556         follower = createBehavior(context);
557
558         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
559
560         Assert.assertSame(follower, newBehavior);
561
562         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
563     }
564
565
566     /**
567      * This test verifies that when InstallSnapshot is received by
568      * the follower its applied correctly.
569      *
570      * @throws Exception
571      */
572     @Test
573     public void testHandleInstallSnapshot() throws Exception {
574         logStart("testHandleInstallSnapshot");
575
576         MockRaftActorContext context = createActorContext();
577
578         follower = createBehavior(context);
579
580         HashMap<String, String> followerSnapshot = new HashMap<>();
581         followerSnapshot.put("1", "A");
582         followerSnapshot.put("2", "B");
583         followerSnapshot.put("3", "C");
584
585         ByteString bsSnapshot  = toByteString(followerSnapshot);
586         int offset = 0;
587         int snapshotLength = bsSnapshot.size();
588         int chunkSize = 50;
589         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
590         int lastIncludedIndex = 1;
591         int chunkIndex = 1;
592         InstallSnapshot lastInstallSnapshot = null;
593
594         for(int i = 0; i < totalChunks; i++) {
595             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
596             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
597                     chunkData, chunkIndex, totalChunks);
598             follower.handleMessage(leaderActor, lastInstallSnapshot);
599             offset = offset + 50;
600             lastIncludedIndex++;
601             chunkIndex++;
602         }
603
604         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
605                 ApplySnapshot.class);
606         Snapshot snapshot = applySnapshot.getSnapshot();
607         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
608         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
609                 snapshot.getLastAppliedTerm());
610         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
611                 snapshot.getLastAppliedIndex());
612         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
613         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
614
615         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
616                 leaderActor, InstallSnapshotReply.class);
617         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
618
619         chunkIndex = 1;
620         for(InstallSnapshotReply reply: replies) {
621             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
622             assertEquals("getTerm", 1, reply.getTerm());
623             assertEquals("isSuccess", true, reply.isSuccess());
624             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
625         }
626
627         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
628     }
629
630     @Test
631     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
632         logStart("testInitialSyncUpWithHandleInstallSnapshot");
633
634         MockRaftActorContext context = createActorContext();
635
636         follower = createBehavior(context);
637
638         HashMap<String, String> followerSnapshot = new HashMap<>();
639         followerSnapshot.put("1", "A");
640         followerSnapshot.put("2", "B");
641         followerSnapshot.put("3", "C");
642
643         ByteString bsSnapshot  = toByteString(followerSnapshot);
644         int offset = 0;
645         int snapshotLength = bsSnapshot.size();
646         int chunkSize = 50;
647         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
648         int lastIncludedIndex = 1;
649         int chunkIndex = 1;
650         InstallSnapshot lastInstallSnapshot = null;
651
652         for(int i = 0; i < totalChunks; i++) {
653             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
654             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
655                     chunkData, chunkIndex, totalChunks);
656             follower.handleMessage(leaderActor, lastInstallSnapshot);
657             offset = offset + 50;
658             lastIncludedIndex++;
659             chunkIndex++;
660         }
661
662         FollowerInitialSyncUpStatus syncStatus =
663                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
664
665         assertFalse(syncStatus.isInitialSyncDone());
666
667         // Clear all the messages
668         followerActor.underlyingActor().clear();
669
670         context.setLastApplied(101);
671         context.setCommitIndex(101);
672         setLastLogEntry(context, 1, 101,
673                 new MockRaftActorContext.MockPayload(""));
674
675         List<ReplicatedLogEntry> entries = Arrays.asList(
676                 newReplicatedLogEntry(2, 101, "foo"));
677
678         // The new commitIndex is 101
679         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
680         follower.handleMessage(leaderActor, appendEntries);
681
682         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
683
684         assertTrue(syncStatus.isInitialSyncDone());
685     }
686
687     @Test
688     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
689         logStart("testHandleOutOfSequenceInstallSnapshot");
690
691         MockRaftActorContext context = createActorContext();
692
693         follower = createBehavior(context);
694
695         HashMap<String, String> followerSnapshot = new HashMap<>();
696         followerSnapshot.put("1", "A");
697         followerSnapshot.put("2", "B");
698         followerSnapshot.put("3", "C");
699
700         ByteString bsSnapshot = toByteString(followerSnapshot);
701
702         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
703                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
704         follower.handleMessage(leaderActor, installSnapshot);
705
706         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
707                 InstallSnapshotReply.class);
708
709         assertEquals("isSuccess", false, reply.isSuccess());
710         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
711         assertEquals("getTerm", 1, reply.getTerm());
712         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
713
714         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
715     }
716
717     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
718         int snapshotLength = bs.size();
719         int start = offset;
720         int size = chunkSize;
721         if (chunkSize > snapshotLength) {
722             size = snapshotLength;
723         } else {
724             if ((start + chunkSize) > snapshotLength) {
725                 size = snapshotLength - start;
726             }
727         }
728         return bs.substring(start, start + size);
729     }
730
731     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
732             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
733
734         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
735                 AppendEntriesReply.class);
736
737         assertEquals("isSuccess", expSuccess, reply.isSuccess());
738         assertEquals("getTerm", expTerm, reply.getTerm());
739         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
740         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
741         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
742     }
743
744     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
745         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
746                 new MockRaftActorContext.MockPayload(data));
747     }
748
749     @Override
750     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
751             ActorRef actorRef, RaftRPC rpc) throws Exception {
752         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
753
754         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
755         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
756     }
757
758     @Override
759     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
760             throws Exception {
761         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
762         assertEquals("isSuccess", true, reply.isSuccess());
763     }
764 }