443ebc31a4ae49380f92d3bfa341879882d137e5
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Stopwatch;
16 import com.google.protobuf.ByteString;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
28 import org.opendaylight.controller.cluster.raft.Snapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
31 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
35 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
37 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
38 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40
41 public class FollowerTest extends AbstractRaftActorBehaviorTest {
42
43     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
44             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
45
46     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
47             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
48
49     private RaftActorBehavior follower;
50
51     private final short payloadVersion = 5;
52
53     @Override
54     @After
55     public void tearDown() throws Exception {
56         if(follower != null) {
57             follower.close();
58         }
59
60         super.tearDown();
61     }
62
63     @Override
64     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
65         return new Follower(actorContext);
66     }
67
68     @Override
69     protected  MockRaftActorContext createActorContext() {
70         return createActorContext(followerActor);
71     }
72
73     @Override
74     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
75         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
76         context.setPayloadVersion(payloadVersion );
77         return context;
78     }
79
80     @Test
81     public void testThatAnElectionTimeoutIsTriggered(){
82         MockRaftActorContext actorContext = createActorContext();
83         follower = new Follower(actorContext);
84
85         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
86                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
87     }
88
89     @Test
90     public void testHandleElectionTimeout(){
91         logStart("testHandleElectionTimeout");
92
93         follower = new Follower(createActorContext());
94
95         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
96
97         assertTrue(raftBehavior instanceof Candidate);
98     }
99
100     @Test
101     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
102         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
103
104         RaftActorContext context = createActorContext();
105         long term = 1000;
106         context.getTermInformation().update(term, null);
107
108         follower = createBehavior(context);
109
110         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
111
112         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
113
114         assertEquals("isVoteGranted", true, reply.isVoteGranted());
115         assertEquals("getTerm", term, reply.getTerm());
116     }
117
118     @Test
119     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
120         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
121
122         RaftActorContext context = createActorContext();
123         long term = 1000;
124         context.getTermInformation().update(term, "test");
125
126         follower = createBehavior(context);
127
128         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
129
130         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
131
132         assertEquals("isVoteGranted", false, reply.isVoteGranted());
133     }
134
135
136     @Test
137     public void testHandleFirstAppendEntries() throws Exception {
138         logStart("testHandleFirstAppendEntries");
139
140         MockRaftActorContext context = createActorContext();
141
142         List<ReplicatedLogEntry> entries = Arrays.asList(
143                 newReplicatedLogEntry(2, 101, "foo"));
144
145         // The new commitIndex is 101
146         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
147
148         follower = createBehavior(context);
149         follower.handleMessage(leaderActor, appendEntries);
150
151         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
152
153         assertFalse(syncStatus.isInitialSyncDone());
154     }
155
156     @Test
157     public void testHandleSyncUpAppendEntries() throws Exception {
158         logStart("testHandleSyncUpAppendEntries");
159
160         MockRaftActorContext context = createActorContext();
161
162         List<ReplicatedLogEntry> entries = Arrays.asList(
163                 newReplicatedLogEntry(2, 101, "foo"));
164
165         // The new commitIndex is 101
166         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
167
168         follower = createBehavior(context);
169         follower.handleMessage(leaderActor, appendEntries);
170
171         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
172
173         assertFalse(syncStatus.isInitialSyncDone());
174
175         // Clear all the messages
176         followerActor.underlyingActor().clear();
177
178         context.setLastApplied(101);
179         context.setCommitIndex(101);
180         setLastLogEntry(context, 1, 101,
181                 new MockRaftActorContext.MockPayload(""));
182
183         entries = Arrays.asList(
184                 newReplicatedLogEntry(2, 101, "foo"));
185
186         // The new commitIndex is 101
187         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
188         follower.handleMessage(leaderActor, appendEntries);
189
190         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
191
192         assertTrue(syncStatus.isInitialSyncDone());
193
194         followerActor.underlyingActor().clear();
195
196         // Sending the same message again should not generate another message
197
198         follower.handleMessage(leaderActor, appendEntries);
199
200         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
201
202         assertNull(syncStatus);
203
204     }
205
206     @Test
207     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
208         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
209
210         MockRaftActorContext context = createActorContext();
211
212         List<ReplicatedLogEntry> entries = Arrays.asList(
213                 newReplicatedLogEntry(2, 101, "foo"));
214
215         // The new commitIndex is 101
216         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
217
218         follower = createBehavior(context);
219         follower.handleMessage(leaderActor, appendEntries);
220
221         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
222
223         assertFalse(syncStatus.isInitialSyncDone());
224
225         // Clear all the messages
226         followerActor.underlyingActor().clear();
227
228         context.setLastApplied(100);
229         setLastLogEntry(context, 1, 100,
230                 new MockRaftActorContext.MockPayload(""));
231
232         entries = Arrays.asList(
233                 newReplicatedLogEntry(2, 101, "foo"));
234
235         // leader-2 is becoming the leader now and it says the commitIndex is 45
236         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
237         follower.handleMessage(leaderActor, appendEntries);
238
239         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
240
241         // We get a new message saying initial status is not done
242         assertFalse(syncStatus.isInitialSyncDone());
243
244     }
245
246
247     @Test
248     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
249         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
250
251         MockRaftActorContext context = createActorContext();
252
253         List<ReplicatedLogEntry> entries = Arrays.asList(
254                 newReplicatedLogEntry(2, 101, "foo"));
255
256         // The new commitIndex is 101
257         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
258
259         follower = createBehavior(context);
260         follower.handleMessage(leaderActor, appendEntries);
261
262         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
263
264         assertFalse(syncStatus.isInitialSyncDone());
265
266         // Clear all the messages
267         followerActor.underlyingActor().clear();
268
269         context.setLastApplied(101);
270         context.setCommitIndex(101);
271         setLastLogEntry(context, 1, 101,
272                 new MockRaftActorContext.MockPayload(""));
273
274         entries = Arrays.asList(
275                 newReplicatedLogEntry(2, 101, "foo"));
276
277         // The new commitIndex is 101
278         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
279         follower.handleMessage(leaderActor, appendEntries);
280
281         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
282
283         assertTrue(syncStatus.isInitialSyncDone());
284
285         // Clear all the messages
286         followerActor.underlyingActor().clear();
287
288         context.setLastApplied(100);
289         setLastLogEntry(context, 1, 100,
290                 new MockRaftActorContext.MockPayload(""));
291
292         entries = Arrays.asList(
293                 newReplicatedLogEntry(2, 101, "foo"));
294
295         // leader-2 is becoming the leader now and it says the commitIndex is 45
296         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
297         follower.handleMessage(leaderActor, appendEntries);
298
299         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
300
301         // We get a new message saying initial status is not done
302         assertFalse(syncStatus.isInitialSyncDone());
303
304     }
305
306
307     /**
308      * This test verifies that when an AppendEntries RPC is received by a RaftActor
309      * with a commitIndex that is greater than what has been applied to the
310      * state machine of the RaftActor, the RaftActor applies the state and
311      * sets it current applied state to the commitIndex of the sender.
312      *
313      * @throws Exception
314      */
315     @Test
316     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
317         logStart("testHandleAppendEntriesWithNewerCommitIndex");
318
319         MockRaftActorContext context = createActorContext();
320
321         context.setLastApplied(100);
322         setLastLogEntry(context, 1, 100,
323                 new MockRaftActorContext.MockPayload(""));
324         context.getReplicatedLog().setSnapshotIndex(99);
325
326         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
327                 newReplicatedLogEntry(2, 101, "foo"));
328
329         // The new commitIndex is 101
330         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
331
332         follower = createBehavior(context);
333         follower.handleMessage(leaderActor, appendEntries);
334
335         assertEquals("getLastApplied", 101L, context.getLastApplied());
336     }
337
338     /**
339      * This test verifies that when an AppendEntries is received a specific prevLogTerm
340      * which does not match the term that is in RaftActors log entry at prevLogIndex
341      * then the RaftActor does not change it's state and it returns a failure.
342      *
343      * @throws Exception
344      */
345     @Test
346     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
347         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
348
349         MockRaftActorContext context = createActorContext();
350
351         // First set the receivers term to lower number
352         context.getTermInformation().update(95, "test");
353
354         // AppendEntries is now sent with a bigger term
355         // this will set the receivers term to be the same as the sender's term
356         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
357
358         follower = createBehavior(context);
359
360         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
361
362         Assert.assertSame(follower, newBehavior);
363
364         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
365                 AppendEntriesReply.class);
366
367         assertEquals("isSuccess", false, reply.isSuccess());
368     }
369
370     /**
371      * This test verifies that when a new AppendEntries message is received with
372      * new entries and the logs of the sender and receiver match that the new
373      * entries get added to the log and the log is incremented by the number of
374      * entries received in appendEntries
375      *
376      * @throws Exception
377      */
378     @Test
379     public void testHandleAppendEntriesAddNewEntries() {
380         logStart("testHandleAppendEntriesAddNewEntries");
381
382         MockRaftActorContext context = createActorContext();
383
384         // First set the receivers term to lower number
385         context.getTermInformation().update(1, "test");
386
387         // Prepare the receivers log
388         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
389         log.append(newReplicatedLogEntry(1, 0, "zero"));
390         log.append(newReplicatedLogEntry(1, 1, "one"));
391         log.append(newReplicatedLogEntry(1, 2, "two"));
392
393         context.setReplicatedLog(log);
394
395         // Prepare the entries to be sent with AppendEntries
396         List<ReplicatedLogEntry> entries = new ArrayList<>();
397         entries.add(newReplicatedLogEntry(1, 3, "three"));
398         entries.add(newReplicatedLogEntry(1, 4, "four"));
399
400         // Send appendEntries with the same term as was set on the receiver
401         // before the new behavior was created (1 in this case)
402         // This will not work for a Candidate because as soon as a Candidate
403         // is created it increments the term
404         AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0);
405
406         follower = createBehavior(context);
407
408         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
409
410         Assert.assertSame(follower, newBehavior);
411
412         assertEquals("Next index", 5, log.last().getIndex() + 1);
413         assertEquals("Entry 3", entries.get(0), log.get(3));
414         assertEquals("Entry 4", entries.get(1), log.get(4));
415
416         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
417     }
418
419     /**
420      * This test verifies that when a new AppendEntries message is received with
421      * new entries and the logs of the sender and receiver are out-of-sync that
422      * the log is first corrected by removing the out of sync entries from the
423      * log and then adding in the new entries sent with the AppendEntries message
424      */
425     @Test
426     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
427         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
428
429         MockRaftActorContext context = createActorContext();
430
431         // First set the receivers term to lower number
432         context.getTermInformation().update(1, "test");
433
434         // Prepare the receivers log
435         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
436         log.append(newReplicatedLogEntry(1, 0, "zero"));
437         log.append(newReplicatedLogEntry(1, 1, "one"));
438         log.append(newReplicatedLogEntry(1, 2, "two"));
439
440         context.setReplicatedLog(log);
441
442         // Prepare the entries to be sent with AppendEntries
443         List<ReplicatedLogEntry> entries = new ArrayList<>();
444         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
445         entries.add(newReplicatedLogEntry(2, 3, "three"));
446
447         // Send appendEntries with the same term as was set on the receiver
448         // before the new behavior was created (1 in this case)
449         // This will not work for a Candidate because as soon as a Candidate
450         // is created it increments the term
451         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
452
453         follower = createBehavior(context);
454
455         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
456
457         Assert.assertSame(follower, newBehavior);
458
459         // The entry at index 2 will be found out-of-sync with the leader
460         // and will be removed
461         // Then the two new entries will be added to the log
462         // Thus making the log to have 4 entries
463         assertEquals("Next index", 4, log.last().getIndex() + 1);
464         //assertEquals("Entry 2", entries.get(0), log.get(2));
465
466         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
467
468         // Check that the entry at index 2 has the new data
469         assertEquals("Entry 2", entries.get(0), log.get(2));
470
471         assertEquals("Entry 3", entries.get(1), log.get(3));
472
473         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
474     }
475
476     @Test
477     public void testHandleAppendEntriesPreviousLogEntryMissing(){
478         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
479
480         MockRaftActorContext context = createActorContext();
481
482         // Prepare the receivers log
483         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
484         log.append(newReplicatedLogEntry(1, 0, "zero"));
485         log.append(newReplicatedLogEntry(1, 1, "one"));
486         log.append(newReplicatedLogEntry(1, 2, "two"));
487
488         context.setReplicatedLog(log);
489
490         // Prepare the entries to be sent with AppendEntries
491         List<ReplicatedLogEntry> entries = new ArrayList<>();
492         entries.add(newReplicatedLogEntry(1, 4, "four"));
493
494         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
495
496         follower = createBehavior(context);
497
498         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
499
500         Assert.assertSame(follower, newBehavior);
501
502         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
503     }
504
505     @Test
506     public void testHandleAppendEntriesWithExistingLogEntry() {
507         logStart("testHandleAppendEntriesWithExistingLogEntry");
508
509         MockRaftActorContext context = createActorContext();
510
511         context.getTermInformation().update(1, "test");
512
513         // Prepare the receivers log
514         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
515         log.append(newReplicatedLogEntry(1, 0, "zero"));
516         log.append(newReplicatedLogEntry(1, 1, "one"));
517
518         context.setReplicatedLog(log);
519
520         // Send the last entry again.
521         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
522
523         follower = createBehavior(context);
524
525         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
526
527         assertEquals("Next index", 2, log.last().getIndex() + 1);
528         assertEquals("Entry 1", entries.get(0), log.get(1));
529
530         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
531
532         // Send the last entry again and also a new one.
533
534         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
535
536         leaderActor.underlyingActor().clear();
537         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
538
539         assertEquals("Next index", 3, log.last().getIndex() + 1);
540         assertEquals("Entry 1", entries.get(0), log.get(1));
541         assertEquals("Entry 2", entries.get(1), log.get(2));
542
543         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
544     }
545
546     @Test
547     public void testHandleAppendEntriesAfterInstallingSnapshot(){
548         logStart("testHandleAppendAfterInstallingSnapshot");
549
550         MockRaftActorContext context = createActorContext();
551
552         // Prepare the receivers log
553         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
554
555         // Set up a log as if it has been snapshotted
556         log.setSnapshotIndex(3);
557         log.setSnapshotTerm(1);
558
559         context.setReplicatedLog(log);
560
561         // Prepare the entries to be sent with AppendEntries
562         List<ReplicatedLogEntry> entries = new ArrayList<>();
563         entries.add(newReplicatedLogEntry(1, 4, "four"));
564
565         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
566
567         follower = createBehavior(context);
568
569         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
570
571         Assert.assertSame(follower, newBehavior);
572
573         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
574     }
575
576
577     /**
578      * This test verifies that when InstallSnapshot is received by
579      * the follower its applied correctly.
580      *
581      * @throws Exception
582      */
583     @Test
584     public void testHandleInstallSnapshot() throws Exception {
585         logStart("testHandleInstallSnapshot");
586
587         MockRaftActorContext context = createActorContext();
588
589         follower = createBehavior(context);
590
591         ByteString bsSnapshot  = createSnapshot();
592         int offset = 0;
593         int snapshotLength = bsSnapshot.size();
594         int chunkSize = 50;
595         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
596         int lastIncludedIndex = 1;
597         int chunkIndex = 1;
598         InstallSnapshot lastInstallSnapshot = null;
599
600         for(int i = 0; i < totalChunks; i++) {
601             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
602             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
603                     chunkData, chunkIndex, totalChunks);
604             follower.handleMessage(leaderActor, lastInstallSnapshot);
605             offset = offset + 50;
606             lastIncludedIndex++;
607             chunkIndex++;
608         }
609
610         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
611                 ApplySnapshot.class);
612         Snapshot snapshot = applySnapshot.getSnapshot();
613         assertNotNull(lastInstallSnapshot);
614         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
615         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
616                 snapshot.getLastAppliedTerm());
617         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
618                 snapshot.getLastAppliedIndex());
619         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
620         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
621
622         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
623                 leaderActor, InstallSnapshotReply.class);
624         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
625
626         chunkIndex = 1;
627         for(InstallSnapshotReply reply: replies) {
628             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
629             assertEquals("getTerm", 1, reply.getTerm());
630             assertEquals("isSuccess", true, reply.isSuccess());
631             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
632         }
633
634         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
635     }
636
637
638     /**
639      * Verify that when an AppendEntries is sent to a follower during a snapshot install
640      * the Follower short-circuits the processing of the AppendEntries message.
641      *
642      * @throws Exception
643      */
644     @Test
645     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
646         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
647
648         MockRaftActorContext context = createActorContext();
649
650         follower = createBehavior(context);
651
652         ByteString bsSnapshot  = createSnapshot();
653         int snapshotLength = bsSnapshot.size();
654         int chunkSize = 50;
655         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
656         int lastIncludedIndex = 1;
657
658         // Check that snapshot installation is not in progress
659         assertNull(((Follower) follower).getSnapshotTracker());
660
661         // Make sure that we have more than 1 chunk to send
662         assertTrue(totalChunks > 1);
663
664         // Send an install snapshot with the first chunk to start the process of installing a snapshot
665         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
666         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
667                 chunkData, 1, totalChunks));
668
669         // Check if snapshot installation is in progress now
670         assertNotNull(((Follower) follower).getSnapshotTracker());
671
672         // Send an append entry
673         AppendEntries appendEntries = mock(AppendEntries.class);
674         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
675
676         follower.handleMessage(leaderActor, appendEntries);
677
678         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
679         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
680         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
681         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
682
683         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
684         verify(appendEntries, never()).getPrevLogIndex();
685
686     }
687
688     @Test
689     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
690         logStart("testInitialSyncUpWithHandleInstallSnapshot");
691
692         MockRaftActorContext context = createActorContext();
693
694         follower = createBehavior(context);
695
696         ByteString bsSnapshot  = createSnapshot();
697         int offset = 0;
698         int snapshotLength = bsSnapshot.size();
699         int chunkSize = 50;
700         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
701         int lastIncludedIndex = 1;
702         int chunkIndex = 1;
703         InstallSnapshot lastInstallSnapshot = null;
704
705         for(int i = 0; i < totalChunks; i++) {
706             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
707             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
708                     chunkData, chunkIndex, totalChunks);
709             follower.handleMessage(leaderActor, lastInstallSnapshot);
710             offset = offset + 50;
711             lastIncludedIndex++;
712             chunkIndex++;
713         }
714
715         FollowerInitialSyncUpStatus syncStatus =
716                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
717
718         assertFalse(syncStatus.isInitialSyncDone());
719
720         // Clear all the messages
721         followerActor.underlyingActor().clear();
722
723         context.setLastApplied(101);
724         context.setCommitIndex(101);
725         setLastLogEntry(context, 1, 101,
726                 new MockRaftActorContext.MockPayload(""));
727
728         List<ReplicatedLogEntry> entries = Arrays.asList(
729                 newReplicatedLogEntry(2, 101, "foo"));
730
731         // The new commitIndex is 101
732         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
733         follower.handleMessage(leaderActor, appendEntries);
734
735         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
736
737         assertTrue(syncStatus.isInitialSyncDone());
738     }
739
740     @Test
741     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
742         logStart("testHandleOutOfSequenceInstallSnapshot");
743
744         MockRaftActorContext context = createActorContext();
745
746         follower = createBehavior(context);
747
748         ByteString bsSnapshot = createSnapshot();
749
750         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
751                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
752         follower.handleMessage(leaderActor, installSnapshot);
753
754         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
755                 InstallSnapshotReply.class);
756
757         assertEquals("isSuccess", false, reply.isSuccess());
758         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
759         assertEquals("getTerm", 1, reply.getTerm());
760         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
761
762         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
763     }
764
765     @Test
766     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
767         MockRaftActorContext context = createActorContext();
768
769         Stopwatch stopwatch = Stopwatch.createStarted();
770
771         follower = createBehavior(context);
772
773         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
774
775         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
776
777         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
778     }
779
780     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
781         int snapshotLength = bs.size();
782         int start = offset;
783         int size = chunkSize;
784         if (chunkSize > snapshotLength) {
785             size = snapshotLength;
786         } else {
787             if ((start + chunkSize) > snapshotLength) {
788                 size = snapshotLength - start;
789             }
790         }
791         return bs.substring(start, start + size);
792     }
793
794     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
795             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
796
797         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
798                 AppendEntriesReply.class);
799
800         assertEquals("isSuccess", expSuccess, reply.isSuccess());
801         assertEquals("getTerm", expTerm, reply.getTerm());
802         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
803         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
804         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
805         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
806     }
807
808     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
809         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
810                 new MockRaftActorContext.MockPayload(data));
811     }
812
813     private ByteString createSnapshot(){
814         HashMap<String, String> followerSnapshot = new HashMap<>();
815         followerSnapshot.put("1", "A");
816         followerSnapshot.put("2", "B");
817         followerSnapshot.put("3", "C");
818
819         return toByteString(followerSnapshot);
820     }
821
822     @Override
823     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
824             ActorRef actorRef, RaftRPC rpc) throws Exception {
825         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
826
827         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
828         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
829     }
830
831     @Override
832     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
833             throws Exception {
834         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
835         assertEquals("isSuccess", true, reply.isSuccess());
836     }
837 }