Bug 3020: Add leader version to LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.Snapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
31 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40
41 public class FollowerTest extends AbstractRaftActorBehaviorTest {
42
43     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
44             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
45
46     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
47             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
48
49     private RaftActorBehavior follower;
50
51     private final short payloadVersion = 5;
52
53     @Override
54     @After
55     public void tearDown() throws Exception {
56         if(follower != null) {
57             follower.close();
58         }
59
60         super.tearDown();
61     }
62
63     @Override
64     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
65         return new Follower(actorContext);
66     }
67
68     @Override
69     protected  MockRaftActorContext createActorContext() {
70         return createActorContext(followerActor);
71     }
72
73     @Override
74     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
75         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
76         context.setPayloadVersion(payloadVersion );
77         return context;
78     }
79
80     @Test
81     public void testThatAnElectionTimeoutIsTriggered(){
82         MockRaftActorContext actorContext = createActorContext();
83         follower = new Follower(actorContext);
84
85         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
86                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
87     }
88
89     @Test
90     public void testHandleElectionTimeout(){
91         logStart("testHandleElectionTimeout");
92
93         follower = new Follower(createActorContext());
94
95         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
96
97         assertTrue(raftBehavior instanceof Candidate);
98     }
99
100     @Test
101     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
102         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
103
104         RaftActorContext context = createActorContext();
105         long term = 1000;
106         context.getTermInformation().update(term, null);
107
108         follower = createBehavior(context);
109
110         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
111
112         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
113
114         assertEquals("isVoteGranted", true, reply.isVoteGranted());
115         assertEquals("getTerm", term, reply.getTerm());
116     }
117
118     @Test
119     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
120         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
121
122         RaftActorContext context = createActorContext();
123         long term = 1000;
124         context.getTermInformation().update(term, "test");
125
126         follower = createBehavior(context);
127
128         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
129
130         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
131
132         assertEquals("isVoteGranted", false, reply.isVoteGranted());
133     }
134
135
136     @Test
137     public void testHandleFirstAppendEntries() throws Exception {
138         logStart("testHandleFirstAppendEntries");
139
140         MockRaftActorContext context = createActorContext();
141
142         List<ReplicatedLogEntry> entries = Arrays.asList(
143                 newReplicatedLogEntry(2, 101, "foo"));
144
145         // The new commitIndex is 101
146         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
147
148         follower = createBehavior(context);
149         follower.handleMessage(leaderActor, appendEntries);
150
151         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
152
153         assertFalse(syncStatus.isInitialSyncDone());
154     }
155
156     @Test
157     public void testHandleSyncUpAppendEntries() throws Exception {
158         logStart("testHandleSyncUpAppendEntries");
159
160         MockRaftActorContext context = createActorContext();
161
162         List<ReplicatedLogEntry> entries = Arrays.asList(
163                 newReplicatedLogEntry(2, 101, "foo"));
164
165         // The new commitIndex is 101
166         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
167
168         follower = createBehavior(context);
169         follower.handleMessage(leaderActor, appendEntries);
170
171         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
172
173         assertFalse(syncStatus.isInitialSyncDone());
174
175         // Clear all the messages
176         followerActor.underlyingActor().clear();
177
178         context.setLastApplied(101);
179         context.setCommitIndex(101);
180         setLastLogEntry(context, 1, 101,
181                 new MockRaftActorContext.MockPayload(""));
182
183         entries = Arrays.asList(
184                 newReplicatedLogEntry(2, 101, "foo"));
185
186         // The new commitIndex is 101
187         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
188         follower.handleMessage(leaderActor, appendEntries);
189
190         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
191
192         assertTrue(syncStatus.isInitialSyncDone());
193
194         followerActor.underlyingActor().clear();
195
196         // Sending the same message again should not generate another message
197
198         follower.handleMessage(leaderActor, appendEntries);
199
200         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
201
202         assertNull(syncStatus);
203
204     }
205
206     @Test
207     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
208         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
209
210         MockRaftActorContext context = createActorContext();
211
212         List<ReplicatedLogEntry> entries = Arrays.asList(
213                 newReplicatedLogEntry(2, 101, "foo"));
214
215         // The new commitIndex is 101
216         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
217
218         follower = createBehavior(context);
219         follower.handleMessage(leaderActor, appendEntries);
220
221         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
222
223         assertFalse(syncStatus.isInitialSyncDone());
224
225         // Clear all the messages
226         followerActor.underlyingActor().clear();
227
228         context.setLastApplied(100);
229         setLastLogEntry(context, 1, 100,
230                 new MockRaftActorContext.MockPayload(""));
231
232         entries = Arrays.asList(
233                 newReplicatedLogEntry(2, 101, "foo"));
234
235         // leader-2 is becoming the leader now and it says the commitIndex is 45
236         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
237         follower.handleMessage(leaderActor, appendEntries);
238
239         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
240
241         // We get a new message saying initial status is not done
242         assertFalse(syncStatus.isInitialSyncDone());
243
244     }
245
246
247     @Test
248     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
249         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
250
251         MockRaftActorContext context = createActorContext();
252
253         List<ReplicatedLogEntry> entries = Arrays.asList(
254                 newReplicatedLogEntry(2, 101, "foo"));
255
256         // The new commitIndex is 101
257         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
258
259         follower = createBehavior(context);
260         follower.handleMessage(leaderActor, appendEntries);
261
262         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
263
264         assertFalse(syncStatus.isInitialSyncDone());
265
266         // Clear all the messages
267         followerActor.underlyingActor().clear();
268
269         context.setLastApplied(101);
270         context.setCommitIndex(101);
271         setLastLogEntry(context, 1, 101,
272                 new MockRaftActorContext.MockPayload(""));
273
274         entries = Arrays.asList(
275                 newReplicatedLogEntry(2, 101, "foo"));
276
277         // The new commitIndex is 101
278         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
279         follower.handleMessage(leaderActor, appendEntries);
280
281         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
282
283         assertTrue(syncStatus.isInitialSyncDone());
284
285         // Clear all the messages
286         followerActor.underlyingActor().clear();
287
288         context.setLastApplied(100);
289         setLastLogEntry(context, 1, 100,
290                 new MockRaftActorContext.MockPayload(""));
291
292         entries = Arrays.asList(
293                 newReplicatedLogEntry(2, 101, "foo"));
294
295         // leader-2 is becoming the leader now and it says the commitIndex is 45
296         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
297         follower.handleMessage(leaderActor, appendEntries);
298
299         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
300
301         // We get a new message saying initial status is not done
302         assertFalse(syncStatus.isInitialSyncDone());
303
304     }
305
306
307     /**
308      * This test verifies that when an AppendEntries RPC is received by a RaftActor
309      * with a commitIndex that is greater than what has been applied to the
310      * state machine of the RaftActor, the RaftActor applies the state and
311      * sets it current applied state to the commitIndex of the sender.
312      *
313      * @throws Exception
314      */
315     @Test
316     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
317         logStart("testHandleAppendEntriesWithNewerCommitIndex");
318
319         MockRaftActorContext context = createActorContext();
320
321         context.setLastApplied(100);
322         setLastLogEntry(context, 1, 100,
323                 new MockRaftActorContext.MockPayload(""));
324         context.getReplicatedLog().setSnapshotIndex(99);
325
326         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
327                 newReplicatedLogEntry(2, 101, "foo"));
328
329         // The new commitIndex is 101
330         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
331
332         follower = createBehavior(context);
333         follower.handleMessage(leaderActor, appendEntries);
334
335         assertEquals("getLastApplied", 101L, context.getLastApplied());
336     }
337
338     /**
339      * This test verifies that when an AppendEntries is received a specific prevLogTerm
340      * which does not match the term that is in RaftActors log entry at prevLogIndex
341      * then the RaftActor does not change it's state and it returns a failure.
342      *
343      * @throws Exception
344      */
345     @Test
346     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
347         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
348
349         MockRaftActorContext context = createActorContext();
350
351         // First set the receivers term to lower number
352         context.getTermInformation().update(95, "test");
353
354         // AppendEntries is now sent with a bigger term
355         // this will set the receivers term to be the same as the sender's term
356         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
357
358         follower = createBehavior(context);
359
360         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
361
362         Assert.assertSame(follower, newBehavior);
363
364         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
365                 AppendEntriesReply.class);
366
367         assertEquals("isSuccess", false, reply.isSuccess());
368     }
369
370     /**
371      * This test verifies that when a new AppendEntries message is received with
372      * new entries and the logs of the sender and receiver match that the new
373      * entries get added to the log and the log is incremented by the number of
374      * entries received in appendEntries
375      *
376      * @throws Exception
377      */
378     @Test
379     public void testHandleAppendEntriesAddNewEntries() {
380         logStart("testHandleAppendEntriesAddNewEntries");
381
382         MockRaftActorContext context = createActorContext();
383
384         // First set the receivers term to lower number
385         context.getTermInformation().update(1, "test");
386
387         // Prepare the receivers log
388         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
389         log.append(newReplicatedLogEntry(1, 0, "zero"));
390         log.append(newReplicatedLogEntry(1, 1, "one"));
391         log.append(newReplicatedLogEntry(1, 2, "two"));
392
393         context.setReplicatedLog(log);
394
395         // Prepare the entries to be sent with AppendEntries
396         List<ReplicatedLogEntry> entries = new ArrayList<>();
397         entries.add(newReplicatedLogEntry(1, 3, "three"));
398         entries.add(newReplicatedLogEntry(1, 4, "four"));
399
400         // Send appendEntries with the same term as was set on the receiver
401         // before the new behavior was created (1 in this case)
402         // This will not work for a Candidate because as soon as a Candidate
403         // is created it increments the term
404         short leaderPayloadVersion = 10;
405         String leaderId = "leader-1";
406         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
407
408         follower = createBehavior(context);
409
410         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
411
412         Assert.assertSame(follower, newBehavior);
413
414         assertEquals("Next index", 5, log.last().getIndex() + 1);
415         assertEquals("Entry 3", entries.get(0), log.get(3));
416         assertEquals("Entry 4", entries.get(1), log.get(4));
417
418         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
419         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
420
421         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
422     }
423
424     /**
425      * This test verifies that when a new AppendEntries message is received with
426      * new entries and the logs of the sender and receiver are out-of-sync that
427      * the log is first corrected by removing the out of sync entries from the
428      * log and then adding in the new entries sent with the AppendEntries message
429      */
430     @Test
431     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
432         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
433
434         MockRaftActorContext context = createActorContext();
435
436         // First set the receivers term to lower number
437         context.getTermInformation().update(1, "test");
438
439         // Prepare the receivers log
440         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
441         log.append(newReplicatedLogEntry(1, 0, "zero"));
442         log.append(newReplicatedLogEntry(1, 1, "one"));
443         log.append(newReplicatedLogEntry(1, 2, "two"));
444
445         context.setReplicatedLog(log);
446
447         // Prepare the entries to be sent with AppendEntries
448         List<ReplicatedLogEntry> entries = new ArrayList<>();
449         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
450         entries.add(newReplicatedLogEntry(2, 3, "three"));
451
452         // Send appendEntries with the same term as was set on the receiver
453         // before the new behavior was created (1 in this case)
454         // This will not work for a Candidate because as soon as a Candidate
455         // is created it increments the term
456         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
457
458         follower = createBehavior(context);
459
460         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
461
462         Assert.assertSame(follower, newBehavior);
463
464         // The entry at index 2 will be found out-of-sync with the leader
465         // and will be removed
466         // Then the two new entries will be added to the log
467         // Thus making the log to have 4 entries
468         assertEquals("Next index", 4, log.last().getIndex() + 1);
469         //assertEquals("Entry 2", entries.get(0), log.get(2));
470
471         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
472
473         // Check that the entry at index 2 has the new data
474         assertEquals("Entry 2", entries.get(0), log.get(2));
475
476         assertEquals("Entry 3", entries.get(1), log.get(3));
477
478         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
479     }
480
481     @Test
482     public void testHandleAppendEntriesPreviousLogEntryMissing(){
483         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
484
485         MockRaftActorContext context = createActorContext();
486
487         // Prepare the receivers log
488         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
489         log.append(newReplicatedLogEntry(1, 0, "zero"));
490         log.append(newReplicatedLogEntry(1, 1, "one"));
491         log.append(newReplicatedLogEntry(1, 2, "two"));
492
493         context.setReplicatedLog(log);
494
495         // Prepare the entries to be sent with AppendEntries
496         List<ReplicatedLogEntry> entries = new ArrayList<>();
497         entries.add(newReplicatedLogEntry(1, 4, "four"));
498
499         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
500
501         follower = createBehavior(context);
502
503         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
504
505         Assert.assertSame(follower, newBehavior);
506
507         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
508     }
509
510     @Test
511     public void testHandleAppendEntriesWithExistingLogEntry() {
512         logStart("testHandleAppendEntriesWithExistingLogEntry");
513
514         MockRaftActorContext context = createActorContext();
515
516         context.getTermInformation().update(1, "test");
517
518         // Prepare the receivers log
519         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
520         log.append(newReplicatedLogEntry(1, 0, "zero"));
521         log.append(newReplicatedLogEntry(1, 1, "one"));
522
523         context.setReplicatedLog(log);
524
525         // Send the last entry again.
526         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
527
528         follower = createBehavior(context);
529
530         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
531
532         assertEquals("Next index", 2, log.last().getIndex() + 1);
533         assertEquals("Entry 1", entries.get(0), log.get(1));
534
535         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
536
537         // Send the last entry again and also a new one.
538
539         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
540
541         leaderActor.underlyingActor().clear();
542         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
543
544         assertEquals("Next index", 3, log.last().getIndex() + 1);
545         assertEquals("Entry 1", entries.get(0), log.get(1));
546         assertEquals("Entry 2", entries.get(1), log.get(2));
547
548         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
549     }
550
551     @Test
552     public void testHandleAppendEntriesAfterInstallingSnapshot(){
553         logStart("testHandleAppendAfterInstallingSnapshot");
554
555         MockRaftActorContext context = createActorContext();
556
557         // Prepare the receivers log
558         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
559
560         // Set up a log as if it has been snapshotted
561         log.setSnapshotIndex(3);
562         log.setSnapshotTerm(1);
563
564         context.setReplicatedLog(log);
565
566         // Prepare the entries to be sent with AppendEntries
567         List<ReplicatedLogEntry> entries = new ArrayList<>();
568         entries.add(newReplicatedLogEntry(1, 4, "four"));
569
570         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
571
572         follower = createBehavior(context);
573
574         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
575
576         Assert.assertSame(follower, newBehavior);
577
578         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
579     }
580
581
582     /**
583      * This test verifies that when InstallSnapshot is received by
584      * the follower its applied correctly.
585      *
586      * @throws Exception
587      */
588     @Test
589     public void testHandleInstallSnapshot() throws Exception {
590         logStart("testHandleInstallSnapshot");
591
592         MockRaftActorContext context = createActorContext();
593
594         follower = createBehavior(context);
595
596         ByteString bsSnapshot  = createSnapshot();
597         int offset = 0;
598         int snapshotLength = bsSnapshot.size();
599         int chunkSize = 50;
600         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
601         int lastIncludedIndex = 1;
602         int chunkIndex = 1;
603         InstallSnapshot lastInstallSnapshot = null;
604
605         for(int i = 0; i < totalChunks; i++) {
606             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
607             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
608                     chunkData, chunkIndex, totalChunks);
609             follower.handleMessage(leaderActor, lastInstallSnapshot);
610             offset = offset + 50;
611             lastIncludedIndex++;
612             chunkIndex++;
613         }
614
615         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
616                 ApplySnapshot.class);
617         Snapshot snapshot = applySnapshot.getSnapshot();
618         assertNotNull(lastInstallSnapshot);
619         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
620         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
621                 snapshot.getLastAppliedTerm());
622         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
623                 snapshot.getLastAppliedIndex());
624         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
625         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
626
627         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
628                 leaderActor, InstallSnapshotReply.class);
629         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
630
631         chunkIndex = 1;
632         for(InstallSnapshotReply reply: replies) {
633             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
634             assertEquals("getTerm", 1, reply.getTerm());
635             assertEquals("isSuccess", true, reply.isSuccess());
636             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
637         }
638
639         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
640     }
641
642
643     /**
644      * Verify that when an AppendEntries is sent to a follower during a snapshot install
645      * the Follower short-circuits the processing of the AppendEntries message.
646      *
647      * @throws Exception
648      */
649     @Test
650     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
651         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
652
653         MockRaftActorContext context = createActorContext();
654
655         follower = createBehavior(context);
656
657         ByteString bsSnapshot  = createSnapshot();
658         int snapshotLength = bsSnapshot.size();
659         int chunkSize = 50;
660         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
661         int lastIncludedIndex = 1;
662
663         // Check that snapshot installation is not in progress
664         assertNull(((Follower) follower).getSnapshotTracker());
665
666         // Make sure that we have more than 1 chunk to send
667         assertTrue(totalChunks > 1);
668
669         // Send an install snapshot with the first chunk to start the process of installing a snapshot
670         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
671         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
672                 chunkData, 1, totalChunks));
673
674         // Check if snapshot installation is in progress now
675         assertNotNull(((Follower) follower).getSnapshotTracker());
676
677         // Send an append entry
678         AppendEntries appendEntries = mock(AppendEntries.class);
679         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
680
681         follower.handleMessage(leaderActor, appendEntries);
682
683         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
684         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
685         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
686         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
687
688         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
689         verify(appendEntries, never()).getPrevLogIndex();
690
691     }
692
693     @Test
694     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
695         logStart("testInitialSyncUpWithHandleInstallSnapshot");
696
697         MockRaftActorContext context = createActorContext();
698
699         follower = createBehavior(context);
700
701         ByteString bsSnapshot  = createSnapshot();
702         int offset = 0;
703         int snapshotLength = bsSnapshot.size();
704         int chunkSize = 50;
705         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
706         int lastIncludedIndex = 1;
707         int chunkIndex = 1;
708         InstallSnapshot lastInstallSnapshot = null;
709
710         for(int i = 0; i < totalChunks; i++) {
711             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
712             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
713                     chunkData, chunkIndex, totalChunks);
714             follower.handleMessage(leaderActor, lastInstallSnapshot);
715             offset = offset + 50;
716             lastIncludedIndex++;
717             chunkIndex++;
718         }
719
720         FollowerInitialSyncUpStatus syncStatus =
721                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
722
723         assertFalse(syncStatus.isInitialSyncDone());
724
725         // Clear all the messages
726         followerActor.underlyingActor().clear();
727
728         context.setLastApplied(101);
729         context.setCommitIndex(101);
730         setLastLogEntry(context, 1, 101,
731                 new MockRaftActorContext.MockPayload(""));
732
733         List<ReplicatedLogEntry> entries = Arrays.asList(
734                 newReplicatedLogEntry(2, 101, "foo"));
735
736         // The new commitIndex is 101
737         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
738         follower.handleMessage(leaderActor, appendEntries);
739
740         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
741
742         assertTrue(syncStatus.isInitialSyncDone());
743     }
744
745     @Test
746     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
747         logStart("testHandleOutOfSequenceInstallSnapshot");
748
749         MockRaftActorContext context = createActorContext();
750
751         follower = createBehavior(context);
752
753         ByteString bsSnapshot = createSnapshot();
754
755         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
756                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
757         follower.handleMessage(leaderActor, installSnapshot);
758
759         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
760                 InstallSnapshotReply.class);
761
762         assertEquals("isSuccess", false, reply.isSuccess());
763         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
764         assertEquals("getTerm", 1, reply.getTerm());
765         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
766
767         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
768     }
769
770     @Test
771     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
772         MockRaftActorContext context = createActorContext();
773
774         Stopwatch stopwatch = Stopwatch.createStarted();
775
776         follower = createBehavior(context);
777
778         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
779
780         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
781
782         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
783     }
784
785     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
786         int snapshotLength = bs.size();
787         int start = offset;
788         int size = chunkSize;
789         if (chunkSize > snapshotLength) {
790             size = snapshotLength;
791         } else {
792             if ((start + chunkSize) > snapshotLength) {
793                 size = snapshotLength - start;
794             }
795         }
796         return bs.substring(start, start + size);
797     }
798
799     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
800             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
801
802         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
803                 AppendEntriesReply.class);
804
805         assertEquals("isSuccess", expSuccess, reply.isSuccess());
806         assertEquals("getTerm", expTerm, reply.getTerm());
807         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
808         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
809         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
810         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
811     }
812
813     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
814         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
815                 new MockRaftActorContext.MockPayload(data));
816     }
817
818     private ByteString createSnapshot(){
819         HashMap<String, String> followerSnapshot = new HashMap<>();
820         followerSnapshot.put("1", "A");
821         followerSnapshot.put("2", "B");
822         followerSnapshot.put("3", "C");
823
824         return toByteString(followerSnapshot);
825     }
826
827     @Override
828     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
829             ActorRef actorRef, RaftRPC rpc) throws Exception {
830         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
831
832         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
833         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
834     }
835
836     @Override
837     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
838             throws Exception {
839         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
840         assertEquals("isSuccess", true, reply.isSuccess());
841     }
842 }