Merge "BUG-2288: deprecate old binding Notification API elements"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.Snapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
31 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40
41 public class FollowerTest extends AbstractRaftActorBehaviorTest {
42
43     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
44             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
45
46     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
47             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
48
49     private RaftActorBehavior follower;
50
51     @Override
52     @After
53     public void tearDown() throws Exception {
54         if(follower != null) {
55             follower.close();
56         }
57
58         super.tearDown();
59     }
60
61     @Override
62     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
63         return new Follower(actorContext);
64     }
65
66     @Override
67     protected  MockRaftActorContext createActorContext() {
68         return createActorContext(followerActor);
69     }
70
71     @Override
72     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
73         return new MockRaftActorContext("follower", getSystem(), actorRef);
74     }
75
76     @Test
77     public void testThatAnElectionTimeoutIsTriggered(){
78         MockRaftActorContext actorContext = createActorContext();
79         follower = new Follower(actorContext);
80
81         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
82                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
83     }
84
85     @Test
86     public void testHandleElectionTimeout(){
87         logStart("testHandleElectionTimeout");
88
89         follower = new Follower(createActorContext());
90
91         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
92
93         assertTrue(raftBehavior instanceof Candidate);
94     }
95
96     @Test
97     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
98         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
99
100         RaftActorContext context = createActorContext();
101         long term = 1000;
102         context.getTermInformation().update(term, null);
103
104         follower = createBehavior(context);
105
106         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
107
108         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
109
110         assertEquals("isVoteGranted", true, reply.isVoteGranted());
111         assertEquals("getTerm", term, reply.getTerm());
112     }
113
114     @Test
115     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
116         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
117
118         RaftActorContext context = createActorContext();
119         long term = 1000;
120         context.getTermInformation().update(term, "test");
121
122         follower = createBehavior(context);
123
124         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
125
126         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
127
128         assertEquals("isVoteGranted", false, reply.isVoteGranted());
129     }
130
131
132     @Test
133     public void testHandleFirstAppendEntries() throws Exception {
134         logStart("testHandleFirstAppendEntries");
135
136         MockRaftActorContext context = createActorContext();
137
138         List<ReplicatedLogEntry> entries = Arrays.asList(
139                 newReplicatedLogEntry(2, 101, "foo"));
140
141         // The new commitIndex is 101
142         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
143
144         follower = createBehavior(context);
145         follower.handleMessage(leaderActor, appendEntries);
146
147         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
148
149         assertFalse(syncStatus.isInitialSyncDone());
150     }
151
152     @Test
153     public void testHandleSyncUpAppendEntries() throws Exception {
154         logStart("testHandleSyncUpAppendEntries");
155
156         MockRaftActorContext context = createActorContext();
157
158         List<ReplicatedLogEntry> entries = Arrays.asList(
159                 newReplicatedLogEntry(2, 101, "foo"));
160
161         // The new commitIndex is 101
162         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
163
164         follower = createBehavior(context);
165         follower.handleMessage(leaderActor, appendEntries);
166
167         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
168
169         assertFalse(syncStatus.isInitialSyncDone());
170
171         // Clear all the messages
172         followerActor.underlyingActor().clear();
173
174         context.setLastApplied(101);
175         context.setCommitIndex(101);
176         setLastLogEntry(context, 1, 101,
177                 new MockRaftActorContext.MockPayload(""));
178
179         entries = Arrays.asList(
180                 newReplicatedLogEntry(2, 101, "foo"));
181
182         // The new commitIndex is 101
183         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
184         follower.handleMessage(leaderActor, appendEntries);
185
186         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
187
188         assertTrue(syncStatus.isInitialSyncDone());
189
190         followerActor.underlyingActor().clear();
191
192         // Sending the same message again should not generate another message
193
194         follower.handleMessage(leaderActor, appendEntries);
195
196         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
197
198         assertNull(syncStatus);
199
200     }
201
202     @Test
203     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
204         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
205
206         MockRaftActorContext context = createActorContext();
207
208         List<ReplicatedLogEntry> entries = Arrays.asList(
209                 newReplicatedLogEntry(2, 101, "foo"));
210
211         // The new commitIndex is 101
212         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
213
214         follower = createBehavior(context);
215         follower.handleMessage(leaderActor, appendEntries);
216
217         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
218
219         assertFalse(syncStatus.isInitialSyncDone());
220
221         // Clear all the messages
222         followerActor.underlyingActor().clear();
223
224         context.setLastApplied(100);
225         setLastLogEntry(context, 1, 100,
226                 new MockRaftActorContext.MockPayload(""));
227
228         entries = Arrays.asList(
229                 newReplicatedLogEntry(2, 101, "foo"));
230
231         // leader-2 is becoming the leader now and it says the commitIndex is 45
232         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
233         follower.handleMessage(leaderActor, appendEntries);
234
235         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
236
237         // We get a new message saying initial status is not done
238         assertFalse(syncStatus.isInitialSyncDone());
239
240     }
241
242
243     @Test
244     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
245         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
246
247         MockRaftActorContext context = createActorContext();
248
249         List<ReplicatedLogEntry> entries = Arrays.asList(
250                 newReplicatedLogEntry(2, 101, "foo"));
251
252         // The new commitIndex is 101
253         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
254
255         follower = createBehavior(context);
256         follower.handleMessage(leaderActor, appendEntries);
257
258         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
259
260         assertFalse(syncStatus.isInitialSyncDone());
261
262         // Clear all the messages
263         followerActor.underlyingActor().clear();
264
265         context.setLastApplied(101);
266         context.setCommitIndex(101);
267         setLastLogEntry(context, 1, 101,
268                 new MockRaftActorContext.MockPayload(""));
269
270         entries = Arrays.asList(
271                 newReplicatedLogEntry(2, 101, "foo"));
272
273         // The new commitIndex is 101
274         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
275         follower.handleMessage(leaderActor, appendEntries);
276
277         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
278
279         assertTrue(syncStatus.isInitialSyncDone());
280
281         // Clear all the messages
282         followerActor.underlyingActor().clear();
283
284         context.setLastApplied(100);
285         setLastLogEntry(context, 1, 100,
286                 new MockRaftActorContext.MockPayload(""));
287
288         entries = Arrays.asList(
289                 newReplicatedLogEntry(2, 101, "foo"));
290
291         // leader-2 is becoming the leader now and it says the commitIndex is 45
292         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
293         follower.handleMessage(leaderActor, appendEntries);
294
295         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
296
297         // We get a new message saying initial status is not done
298         assertFalse(syncStatus.isInitialSyncDone());
299
300     }
301
302
303     /**
304      * This test verifies that when an AppendEntries RPC is received by a RaftActor
305      * with a commitIndex that is greater than what has been applied to the
306      * state machine of the RaftActor, the RaftActor applies the state and
307      * sets it current applied state to the commitIndex of the sender.
308      *
309      * @throws Exception
310      */
311     @Test
312     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
313         logStart("testHandleAppendEntriesWithNewerCommitIndex");
314
315         MockRaftActorContext context = createActorContext();
316
317         context.setLastApplied(100);
318         setLastLogEntry(context, 1, 100,
319                 new MockRaftActorContext.MockPayload(""));
320         context.getReplicatedLog().setSnapshotIndex(99);
321
322         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
323                 newReplicatedLogEntry(2, 101, "foo"));
324
325         // The new commitIndex is 101
326         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
327
328         follower = createBehavior(context);
329         follower.handleMessage(leaderActor, appendEntries);
330
331         assertEquals("getLastApplied", 101L, context.getLastApplied());
332     }
333
334     /**
335      * This test verifies that when an AppendEntries is received a specific prevLogTerm
336      * which does not match the term that is in RaftActors log entry at prevLogIndex
337      * then the RaftActor does not change it's state and it returns a failure.
338      *
339      * @throws Exception
340      */
341     @Test
342     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
343         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
344
345         MockRaftActorContext context = createActorContext();
346
347         // First set the receivers term to lower number
348         context.getTermInformation().update(95, "test");
349
350         // AppendEntries is now sent with a bigger term
351         // this will set the receivers term to be the same as the sender's term
352         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
353
354         follower = createBehavior(context);
355
356         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
357
358         Assert.assertSame(follower, newBehavior);
359
360         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
361                 AppendEntriesReply.class);
362
363         assertEquals("isSuccess", false, reply.isSuccess());
364     }
365
366     /**
367      * This test verifies that when a new AppendEntries message is received with
368      * new entries and the logs of the sender and receiver match that the new
369      * entries get added to the log and the log is incremented by the number of
370      * entries received in appendEntries
371      *
372      * @throws Exception
373      */
374     @Test
375     public void testHandleAppendEntriesAddNewEntries() {
376         logStart("testHandleAppendEntriesAddNewEntries");
377
378         MockRaftActorContext context = createActorContext();
379
380         // First set the receivers term to lower number
381         context.getTermInformation().update(1, "test");
382
383         // Prepare the receivers log
384         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
385         log.append(newReplicatedLogEntry(1, 0, "zero"));
386         log.append(newReplicatedLogEntry(1, 1, "one"));
387         log.append(newReplicatedLogEntry(1, 2, "two"));
388
389         context.setReplicatedLog(log);
390
391         // Prepare the entries to be sent with AppendEntries
392         List<ReplicatedLogEntry> entries = new ArrayList<>();
393         entries.add(newReplicatedLogEntry(1, 3, "three"));
394         entries.add(newReplicatedLogEntry(1, 4, "four"));
395
396         // Send appendEntries with the same term as was set on the receiver
397         // before the new behavior was created (1 in this case)
398         // This will not work for a Candidate because as soon as a Candidate
399         // is created it increments the term
400         AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
401
402         follower = createBehavior(context);
403
404         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
405
406         Assert.assertSame(follower, newBehavior);
407
408         assertEquals("Next index", 5, log.last().getIndex() + 1);
409         assertEquals("Entry 3", entries.get(0), log.get(3));
410         assertEquals("Entry 4", entries.get(1), log.get(4));
411
412         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
413     }
414
415     /**
416      * This test verifies that when a new AppendEntries message is received with
417      * new entries and the logs of the sender and receiver are out-of-sync that
418      * the log is first corrected by removing the out of sync entries from the
419      * log and then adding in the new entries sent with the AppendEntries message
420      */
421     @Test
422     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
423         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
424
425         MockRaftActorContext context = createActorContext();
426
427         // First set the receivers term to lower number
428         context.getTermInformation().update(1, "test");
429
430         // Prepare the receivers log
431         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
432         log.append(newReplicatedLogEntry(1, 0, "zero"));
433         log.append(newReplicatedLogEntry(1, 1, "one"));
434         log.append(newReplicatedLogEntry(1, 2, "two"));
435
436         context.setReplicatedLog(log);
437
438         // Prepare the entries to be sent with AppendEntries
439         List<ReplicatedLogEntry> entries = new ArrayList<>();
440         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
441         entries.add(newReplicatedLogEntry(2, 3, "three"));
442
443         // Send appendEntries with the same term as was set on the receiver
444         // before the new behavior was created (1 in this case)
445         // This will not work for a Candidate because as soon as a Candidate
446         // is created it increments the term
447         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
448
449         follower = createBehavior(context);
450
451         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
452
453         Assert.assertSame(follower, newBehavior);
454
455         // The entry at index 2 will be found out-of-sync with the leader
456         // and will be removed
457         // Then the two new entries will be added to the log
458         // Thus making the log to have 4 entries
459         assertEquals("Next index", 4, log.last().getIndex() + 1);
460         //assertEquals("Entry 2", entries.get(0), log.get(2));
461
462         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
463
464         // Check that the entry at index 2 has the new data
465         assertEquals("Entry 2", entries.get(0), log.get(2));
466
467         assertEquals("Entry 3", entries.get(1), log.get(3));
468
469         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
470     }
471
472     @Test
473     public void testHandleAppendEntriesPreviousLogEntryMissing(){
474         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
475
476         MockRaftActorContext context = createActorContext();
477
478         // Prepare the receivers log
479         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
480         log.append(newReplicatedLogEntry(1, 0, "zero"));
481         log.append(newReplicatedLogEntry(1, 1, "one"));
482         log.append(newReplicatedLogEntry(1, 2, "two"));
483
484         context.setReplicatedLog(log);
485
486         // Prepare the entries to be sent with AppendEntries
487         List<ReplicatedLogEntry> entries = new ArrayList<>();
488         entries.add(newReplicatedLogEntry(1, 4, "four"));
489
490         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
491
492         follower = createBehavior(context);
493
494         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
495
496         Assert.assertSame(follower, newBehavior);
497
498         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
499     }
500
501     @Test
502     public void testHandleAppendEntriesWithExistingLogEntry() {
503         logStart("testHandleAppendEntriesWithExistingLogEntry");
504
505         MockRaftActorContext context = createActorContext();
506
507         context.getTermInformation().update(1, "test");
508
509         // Prepare the receivers log
510         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
511         log.append(newReplicatedLogEntry(1, 0, "zero"));
512         log.append(newReplicatedLogEntry(1, 1, "one"));
513
514         context.setReplicatedLog(log);
515
516         // Send the last entry again.
517         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
518
519         follower = createBehavior(context);
520
521         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
522
523         assertEquals("Next index", 2, log.last().getIndex() + 1);
524         assertEquals("Entry 1", entries.get(0), log.get(1));
525
526         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
527
528         // Send the last entry again and also a new one.
529
530         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
531
532         leaderActor.underlyingActor().clear();
533         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
534
535         assertEquals("Next index", 3, log.last().getIndex() + 1);
536         assertEquals("Entry 1", entries.get(0), log.get(1));
537         assertEquals("Entry 2", entries.get(1), log.get(2));
538
539         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
540     }
541
542     @Test
543     public void testHandleAppendEntriesAfterInstallingSnapshot(){
544         logStart("testHandleAppendAfterInstallingSnapshot");
545
546         MockRaftActorContext context = createActorContext();
547
548         // Prepare the receivers log
549         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
550
551         // Set up a log as if it has been snapshotted
552         log.setSnapshotIndex(3);
553         log.setSnapshotTerm(1);
554
555         context.setReplicatedLog(log);
556
557         // Prepare the entries to be sent with AppendEntries
558         List<ReplicatedLogEntry> entries = new ArrayList<>();
559         entries.add(newReplicatedLogEntry(1, 4, "four"));
560
561         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
562
563         follower = createBehavior(context);
564
565         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
566
567         Assert.assertSame(follower, newBehavior);
568
569         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
570     }
571
572
573     /**
574      * This test verifies that when InstallSnapshot is received by
575      * the follower its applied correctly.
576      *
577      * @throws Exception
578      */
579     @Test
580     public void testHandleInstallSnapshot() throws Exception {
581         logStart("testHandleInstallSnapshot");
582
583         MockRaftActorContext context = createActorContext();
584
585         follower = createBehavior(context);
586
587         ByteString bsSnapshot  = createSnapshot();
588         int offset = 0;
589         int snapshotLength = bsSnapshot.size();
590         int chunkSize = 50;
591         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
592         int lastIncludedIndex = 1;
593         int chunkIndex = 1;
594         InstallSnapshot lastInstallSnapshot = null;
595
596         for(int i = 0; i < totalChunks; i++) {
597             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
598             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
599                     chunkData, chunkIndex, totalChunks);
600             follower.handleMessage(leaderActor, lastInstallSnapshot);
601             offset = offset + 50;
602             lastIncludedIndex++;
603             chunkIndex++;
604         }
605
606         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
607                 ApplySnapshot.class);
608         Snapshot snapshot = applySnapshot.getSnapshot();
609         assertNotNull(lastInstallSnapshot);
610         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
611         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
612                 snapshot.getLastAppliedTerm());
613         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
614                 snapshot.getLastAppliedIndex());
615         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
616         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
617
618         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
619                 leaderActor, InstallSnapshotReply.class);
620         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
621
622         chunkIndex = 1;
623         for(InstallSnapshotReply reply: replies) {
624             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
625             assertEquals("getTerm", 1, reply.getTerm());
626             assertEquals("isSuccess", true, reply.isSuccess());
627             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
628         }
629
630         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
631     }
632
633
634     /**
635      * Verify that when an AppendEntries is sent to a follower during a snapshot install
636      * the Follower short-circuits the processing of the AppendEntries message.
637      *
638      * @throws Exception
639      */
640     @Test
641     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
642         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
643
644         MockRaftActorContext context = createActorContext();
645
646         follower = createBehavior(context);
647
648         ByteString bsSnapshot  = createSnapshot();
649         int snapshotLength = bsSnapshot.size();
650         int chunkSize = 50;
651         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
652         int lastIncludedIndex = 1;
653
654         // Check that snapshot installation is not in progress
655         assertNull(((Follower) follower).getSnapshotTracker());
656
657         // Make sure that we have more than 1 chunk to send
658         assertTrue(totalChunks > 1);
659
660         // Send an install snapshot with the first chunk to start the process of installing a snapshot
661         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
662         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
663                 chunkData, 1, totalChunks));
664
665         // Check if snapshot installation is in progress now
666         assertNotNull(((Follower) follower).getSnapshotTracker());
667
668         // Send an append entry
669         AppendEntries appendEntries = mock(AppendEntries.class);
670         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
671
672         follower.handleMessage(leaderActor, appendEntries);
673
674         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
675         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
676         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
677         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
678
679         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
680         verify(appendEntries, never()).getPrevLogIndex();
681
682     }
683
684     @Test
685     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
686         logStart("testInitialSyncUpWithHandleInstallSnapshot");
687
688         MockRaftActorContext context = createActorContext();
689
690         follower = createBehavior(context);
691
692         ByteString bsSnapshot  = createSnapshot();
693         int offset = 0;
694         int snapshotLength = bsSnapshot.size();
695         int chunkSize = 50;
696         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
697         int lastIncludedIndex = 1;
698         int chunkIndex = 1;
699         InstallSnapshot lastInstallSnapshot = null;
700
701         for(int i = 0; i < totalChunks; i++) {
702             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
703             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
704                     chunkData, chunkIndex, totalChunks);
705             follower.handleMessage(leaderActor, lastInstallSnapshot);
706             offset = offset + 50;
707             lastIncludedIndex++;
708             chunkIndex++;
709         }
710
711         FollowerInitialSyncUpStatus syncStatus =
712                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
713
714         assertFalse(syncStatus.isInitialSyncDone());
715
716         // Clear all the messages
717         followerActor.underlyingActor().clear();
718
719         context.setLastApplied(101);
720         context.setCommitIndex(101);
721         setLastLogEntry(context, 1, 101,
722                 new MockRaftActorContext.MockPayload(""));
723
724         List<ReplicatedLogEntry> entries = Arrays.asList(
725                 newReplicatedLogEntry(2, 101, "foo"));
726
727         // The new commitIndex is 101
728         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
729         follower.handleMessage(leaderActor, appendEntries);
730
731         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
732
733         assertTrue(syncStatus.isInitialSyncDone());
734     }
735
736     @Test
737     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
738         logStart("testHandleOutOfSequenceInstallSnapshot");
739
740         MockRaftActorContext context = createActorContext();
741
742         follower = createBehavior(context);
743
744         ByteString bsSnapshot = createSnapshot();
745
746         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
747                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
748         follower.handleMessage(leaderActor, installSnapshot);
749
750         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
751                 InstallSnapshotReply.class);
752
753         assertEquals("isSuccess", false, reply.isSuccess());
754         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
755         assertEquals("getTerm", 1, reply.getTerm());
756         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
757
758         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
759     }
760
761     @Test
762     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
763         MockRaftActorContext context = createActorContext();
764
765         Stopwatch stopwatch = Stopwatch.createStarted();
766
767         follower = createBehavior(context);
768
769         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
770
771         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
772
773         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
774     }
775
776     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
777         int snapshotLength = bs.size();
778         int start = offset;
779         int size = chunkSize;
780         if (chunkSize > snapshotLength) {
781             size = snapshotLength;
782         } else {
783             if ((start + chunkSize) > snapshotLength) {
784                 size = snapshotLength - start;
785             }
786         }
787         return bs.substring(start, start + size);
788     }
789
790     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
791             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
792
793         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
794                 AppendEntriesReply.class);
795
796         assertEquals("isSuccess", expSuccess, reply.isSuccess());
797         assertEquals("getTerm", expTerm, reply.getTerm());
798         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
799         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
800         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
801     }
802
803     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
804         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
805                 new MockRaftActorContext.MockPayload(data));
806     }
807
808     private ByteString createSnapshot(){
809         HashMap<String, String> followerSnapshot = new HashMap<>();
810         followerSnapshot.put("1", "A");
811         followerSnapshot.put("2", "B");
812         followerSnapshot.put("3", "C");
813
814         return toByteString(followerSnapshot);
815     }
816
817     @Override
818     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
819             ActorRef actorRef, RaftRPC rpc) throws Exception {
820         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
821
822         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
823         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
824     }
825
826     @Override
827     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
828             throws Exception {
829         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
830         assertEquals("isSuccess", true, reply.isSuccess());
831     }
832 }