Merge "BUG-2424 Bump mina sshd-core version to 0.14"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.mock;
10 import static org.mockito.Mockito.never;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.testkit.TestActorRef;
15 import com.google.protobuf.ByteString;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.HashMap;
19 import java.util.List;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
24 import org.opendaylight.controller.cluster.raft.RaftActorContext;
25 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
26 import org.opendaylight.controller.cluster.raft.Snapshot;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
28 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
29 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
35 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
36 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
37 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
38
39 public class FollowerTest extends AbstractRaftActorBehaviorTest {
40
41     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
42             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
43
44     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
45             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
46
47     private RaftActorBehavior follower;
48
49     @Override
50     @After
51     public void tearDown() throws Exception {
52         if(follower != null) {
53             follower.close();
54         }
55
56         super.tearDown();
57     }
58
59     @Override
60     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
61         return new Follower(actorContext);
62     }
63
64     @Override
65     protected  MockRaftActorContext createActorContext() {
66         return createActorContext(followerActor);
67     }
68
69     @Override
70     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
71         return new MockRaftActorContext("follower", getSystem(), actorRef);
72     }
73
74     @Test
75     public void testThatAnElectionTimeoutIsTriggered(){
76         MockRaftActorContext actorContext = createActorContext();
77         follower = new Follower(actorContext);
78
79         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
80                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
81     }
82
83     @Test
84     public void testHandleElectionTimeout(){
85         logStart("testHandleElectionTimeout");
86
87         follower = new Follower(createActorContext());
88
89         RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
90
91         assertTrue(raftBehavior instanceof Candidate);
92     }
93
94     @Test
95     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
96         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
97
98         RaftActorContext context = createActorContext();
99         long term = 1000;
100         context.getTermInformation().update(term, null);
101
102         follower = createBehavior(context);
103
104         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
105
106         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
107
108         assertEquals("isVoteGranted", true, reply.isVoteGranted());
109         assertEquals("getTerm", term, reply.getTerm());
110     }
111
112     @Test
113     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
114         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
115
116         RaftActorContext context = createActorContext();
117         long term = 1000;
118         context.getTermInformation().update(term, "test");
119
120         follower = createBehavior(context);
121
122         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
123
124         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
125
126         assertEquals("isVoteGranted", false, reply.isVoteGranted());
127     }
128
129
130     @Test
131     public void testHandleFirstAppendEntries() throws Exception {
132         logStart("testHandleFirstAppendEntries");
133
134         MockRaftActorContext context = createActorContext();
135
136         List<ReplicatedLogEntry> entries = Arrays.asList(
137                 newReplicatedLogEntry(2, 101, "foo"));
138
139         // The new commitIndex is 101
140         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
141
142         follower = createBehavior(context);
143         follower.handleMessage(leaderActor, appendEntries);
144
145         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
146
147         assertFalse(syncStatus.isInitialSyncDone());
148     }
149
150     @Test
151     public void testHandleSyncUpAppendEntries() throws Exception {
152         logStart("testHandleSyncUpAppendEntries");
153
154         MockRaftActorContext context = createActorContext();
155
156         List<ReplicatedLogEntry> entries = Arrays.asList(
157                 newReplicatedLogEntry(2, 101, "foo"));
158
159         // The new commitIndex is 101
160         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
161
162         follower = createBehavior(context);
163         follower.handleMessage(leaderActor, appendEntries);
164
165         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
166
167         assertFalse(syncStatus.isInitialSyncDone());
168
169         // Clear all the messages
170         followerActor.underlyingActor().clear();
171
172         context.setLastApplied(101);
173         context.setCommitIndex(101);
174         setLastLogEntry(context, 1, 101,
175                 new MockRaftActorContext.MockPayload(""));
176
177         entries = Arrays.asList(
178                 newReplicatedLogEntry(2, 101, "foo"));
179
180         // The new commitIndex is 101
181         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
182         follower.handleMessage(leaderActor, appendEntries);
183
184         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
185
186         assertTrue(syncStatus.isInitialSyncDone());
187
188         followerActor.underlyingActor().clear();
189
190         // Sending the same message again should not generate another message
191
192         follower.handleMessage(leaderActor, appendEntries);
193
194         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
195
196         assertNull(syncStatus);
197
198     }
199
200     @Test
201     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
202         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
203
204         MockRaftActorContext context = createActorContext();
205
206         List<ReplicatedLogEntry> entries = Arrays.asList(
207                 newReplicatedLogEntry(2, 101, "foo"));
208
209         // The new commitIndex is 101
210         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
211
212         follower = createBehavior(context);
213         follower.handleMessage(leaderActor, appendEntries);
214
215         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
216
217         assertFalse(syncStatus.isInitialSyncDone());
218
219         // Clear all the messages
220         followerActor.underlyingActor().clear();
221
222         context.setLastApplied(100);
223         setLastLogEntry(context, 1, 100,
224                 new MockRaftActorContext.MockPayload(""));
225
226         entries = Arrays.asList(
227                 newReplicatedLogEntry(2, 101, "foo"));
228
229         // leader-2 is becoming the leader now and it says the commitIndex is 45
230         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
231         follower.handleMessage(leaderActor, appendEntries);
232
233         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
234
235         // We get a new message saying initial status is not done
236         assertFalse(syncStatus.isInitialSyncDone());
237
238     }
239
240
241     @Test
242     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
243         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
244
245         MockRaftActorContext context = createActorContext();
246
247         List<ReplicatedLogEntry> entries = Arrays.asList(
248                 newReplicatedLogEntry(2, 101, "foo"));
249
250         // The new commitIndex is 101
251         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
252
253         follower = createBehavior(context);
254         follower.handleMessage(leaderActor, appendEntries);
255
256         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
257
258         assertFalse(syncStatus.isInitialSyncDone());
259
260         // Clear all the messages
261         followerActor.underlyingActor().clear();
262
263         context.setLastApplied(101);
264         context.setCommitIndex(101);
265         setLastLogEntry(context, 1, 101,
266                 new MockRaftActorContext.MockPayload(""));
267
268         entries = Arrays.asList(
269                 newReplicatedLogEntry(2, 101, "foo"));
270
271         // The new commitIndex is 101
272         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
273         follower.handleMessage(leaderActor, appendEntries);
274
275         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
276
277         assertTrue(syncStatus.isInitialSyncDone());
278
279         // Clear all the messages
280         followerActor.underlyingActor().clear();
281
282         context.setLastApplied(100);
283         setLastLogEntry(context, 1, 100,
284                 new MockRaftActorContext.MockPayload(""));
285
286         entries = Arrays.asList(
287                 newReplicatedLogEntry(2, 101, "foo"));
288
289         // leader-2 is becoming the leader now and it says the commitIndex is 45
290         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
291         follower.handleMessage(leaderActor, appendEntries);
292
293         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
294
295         // We get a new message saying initial status is not done
296         assertFalse(syncStatus.isInitialSyncDone());
297
298     }
299
300
301     /**
302      * This test verifies that when an AppendEntries RPC is received by a RaftActor
303      * with a commitIndex that is greater than what has been applied to the
304      * state machine of the RaftActor, the RaftActor applies the state and
305      * sets it current applied state to the commitIndex of the sender.
306      *
307      * @throws Exception
308      */
309     @Test
310     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
311         logStart("testHandleAppendEntriesWithNewerCommitIndex");
312
313         MockRaftActorContext context = createActorContext();
314
315         context.setLastApplied(100);
316         setLastLogEntry(context, 1, 100,
317                 new MockRaftActorContext.MockPayload(""));
318         context.getReplicatedLog().setSnapshotIndex(99);
319
320         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
321                 newReplicatedLogEntry(2, 101, "foo"));
322
323         // The new commitIndex is 101
324         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
325
326         follower = createBehavior(context);
327         follower.handleMessage(leaderActor, appendEntries);
328
329         assertEquals("getLastApplied", 101L, context.getLastApplied());
330     }
331
332     /**
333      * This test verifies that when an AppendEntries is received a specific prevLogTerm
334      * which does not match the term that is in RaftActors log entry at prevLogIndex
335      * then the RaftActor does not change it's state and it returns a failure.
336      *
337      * @throws Exception
338      */
339     @Test
340     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
341         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
342
343         MockRaftActorContext context = createActorContext();
344
345         // First set the receivers term to lower number
346         context.getTermInformation().update(95, "test");
347
348         // AppendEntries is now sent with a bigger term
349         // this will set the receivers term to be the same as the sender's term
350         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
351
352         follower = createBehavior(context);
353
354         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
355
356         Assert.assertSame(follower, newBehavior);
357
358         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
359                 AppendEntriesReply.class);
360
361         assertEquals("isSuccess", false, reply.isSuccess());
362     }
363
364     /**
365      * This test verifies that when a new AppendEntries message is received with
366      * new entries and the logs of the sender and receiver match that the new
367      * entries get added to the log and the log is incremented by the number of
368      * entries received in appendEntries
369      *
370      * @throws Exception
371      */
372     @Test
373     public void testHandleAppendEntriesAddNewEntries() {
374         logStart("testHandleAppendEntriesAddNewEntries");
375
376         MockRaftActorContext context = createActorContext();
377
378         // First set the receivers term to lower number
379         context.getTermInformation().update(1, "test");
380
381         // Prepare the receivers log
382         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
383         log.append(newReplicatedLogEntry(1, 0, "zero"));
384         log.append(newReplicatedLogEntry(1, 1, "one"));
385         log.append(newReplicatedLogEntry(1, 2, "two"));
386
387         context.setReplicatedLog(log);
388
389         // Prepare the entries to be sent with AppendEntries
390         List<ReplicatedLogEntry> entries = new ArrayList<>();
391         entries.add(newReplicatedLogEntry(1, 3, "three"));
392         entries.add(newReplicatedLogEntry(1, 4, "four"));
393
394         // Send appendEntries with the same term as was set on the receiver
395         // before the new behavior was created (1 in this case)
396         // This will not work for a Candidate because as soon as a Candidate
397         // is created it increments the term
398         AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
399
400         follower = createBehavior(context);
401
402         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
403
404         Assert.assertSame(follower, newBehavior);
405
406         assertEquals("Next index", 5, log.last().getIndex() + 1);
407         assertEquals("Entry 3", entries.get(0), log.get(3));
408         assertEquals("Entry 4", entries.get(1), log.get(4));
409
410         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
411     }
412
413     /**
414      * This test verifies that when a new AppendEntries message is received with
415      * new entries and the logs of the sender and receiver are out-of-sync that
416      * the log is first corrected by removing the out of sync entries from the
417      * log and then adding in the new entries sent with the AppendEntries message
418      */
419     @Test
420     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
421         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
422
423         MockRaftActorContext context = createActorContext();
424
425         // First set the receivers term to lower number
426         context.getTermInformation().update(1, "test");
427
428         // Prepare the receivers log
429         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
430         log.append(newReplicatedLogEntry(1, 0, "zero"));
431         log.append(newReplicatedLogEntry(1, 1, "one"));
432         log.append(newReplicatedLogEntry(1, 2, "two"));
433
434         context.setReplicatedLog(log);
435
436         // Prepare the entries to be sent with AppendEntries
437         List<ReplicatedLogEntry> entries = new ArrayList<>();
438         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
439         entries.add(newReplicatedLogEntry(2, 3, "three"));
440
441         // Send appendEntries with the same term as was set on the receiver
442         // before the new behavior was created (1 in this case)
443         // This will not work for a Candidate because as soon as a Candidate
444         // is created it increments the term
445         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
446
447         follower = createBehavior(context);
448
449         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
450
451         Assert.assertSame(follower, newBehavior);
452
453         // The entry at index 2 will be found out-of-sync with the leader
454         // and will be removed
455         // Then the two new entries will be added to the log
456         // Thus making the log to have 4 entries
457         assertEquals("Next index", 4, log.last().getIndex() + 1);
458         //assertEquals("Entry 2", entries.get(0), log.get(2));
459
460         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
461
462         // Check that the entry at index 2 has the new data
463         assertEquals("Entry 2", entries.get(0), log.get(2));
464
465         assertEquals("Entry 3", entries.get(1), log.get(3));
466
467         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
468     }
469
470     @Test
471     public void testHandleAppendEntriesPreviousLogEntryMissing(){
472         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
473
474         MockRaftActorContext context = createActorContext();
475
476         // Prepare the receivers log
477         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
478         log.append(newReplicatedLogEntry(1, 0, "zero"));
479         log.append(newReplicatedLogEntry(1, 1, "one"));
480         log.append(newReplicatedLogEntry(1, 2, "two"));
481
482         context.setReplicatedLog(log);
483
484         // Prepare the entries to be sent with AppendEntries
485         List<ReplicatedLogEntry> entries = new ArrayList<>();
486         entries.add(newReplicatedLogEntry(1, 4, "four"));
487
488         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
489
490         follower = createBehavior(context);
491
492         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
493
494         Assert.assertSame(follower, newBehavior);
495
496         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
497     }
498
499     @Test
500     public void testHandleAppendEntriesWithExistingLogEntry() {
501         logStart("testHandleAppendEntriesWithExistingLogEntry");
502
503         MockRaftActorContext context = createActorContext();
504
505         context.getTermInformation().update(1, "test");
506
507         // Prepare the receivers log
508         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
509         log.append(newReplicatedLogEntry(1, 0, "zero"));
510         log.append(newReplicatedLogEntry(1, 1, "one"));
511
512         context.setReplicatedLog(log);
513
514         // Send the last entry again.
515         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
516
517         follower = createBehavior(context);
518
519         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
520
521         assertEquals("Next index", 2, log.last().getIndex() + 1);
522         assertEquals("Entry 1", entries.get(0), log.get(1));
523
524         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
525
526         // Send the last entry again and also a new one.
527
528         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
529
530         leaderActor.underlyingActor().clear();
531         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
532
533         assertEquals("Next index", 3, log.last().getIndex() + 1);
534         assertEquals("Entry 1", entries.get(0), log.get(1));
535         assertEquals("Entry 2", entries.get(1), log.get(2));
536
537         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
538     }
539
540     @Test
541     public void testHandleAppendEntriesAfterInstallingSnapshot(){
542         logStart("testHandleAppendAfterInstallingSnapshot");
543
544         MockRaftActorContext context = createActorContext();
545
546         // Prepare the receivers log
547         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
548
549         // Set up a log as if it has been snapshotted
550         log.setSnapshotIndex(3);
551         log.setSnapshotTerm(1);
552
553         context.setReplicatedLog(log);
554
555         // Prepare the entries to be sent with AppendEntries
556         List<ReplicatedLogEntry> entries = new ArrayList<>();
557         entries.add(newReplicatedLogEntry(1, 4, "four"));
558
559         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
560
561         follower = createBehavior(context);
562
563         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
564
565         Assert.assertSame(follower, newBehavior);
566
567         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
568     }
569
570
571     /**
572      * This test verifies that when InstallSnapshot is received by
573      * the follower its applied correctly.
574      *
575      * @throws Exception
576      */
577     @Test
578     public void testHandleInstallSnapshot() throws Exception {
579         logStart("testHandleInstallSnapshot");
580
581         MockRaftActorContext context = createActorContext();
582
583         follower = createBehavior(context);
584
585         ByteString bsSnapshot  = createSnapshot();
586         int offset = 0;
587         int snapshotLength = bsSnapshot.size();
588         int chunkSize = 50;
589         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
590         int lastIncludedIndex = 1;
591         int chunkIndex = 1;
592         InstallSnapshot lastInstallSnapshot = null;
593
594         for(int i = 0; i < totalChunks; i++) {
595             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
596             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
597                     chunkData, chunkIndex, totalChunks);
598             follower.handleMessage(leaderActor, lastInstallSnapshot);
599             offset = offset + 50;
600             lastIncludedIndex++;
601             chunkIndex++;
602         }
603
604         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
605                 ApplySnapshot.class);
606         Snapshot snapshot = applySnapshot.getSnapshot();
607         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
608         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
609                 snapshot.getLastAppliedTerm());
610         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
611                 snapshot.getLastAppliedIndex());
612         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
613         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
614
615         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
616                 leaderActor, InstallSnapshotReply.class);
617         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
618
619         chunkIndex = 1;
620         for(InstallSnapshotReply reply: replies) {
621             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
622             assertEquals("getTerm", 1, reply.getTerm());
623             assertEquals("isSuccess", true, reply.isSuccess());
624             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
625         }
626
627         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
628     }
629
630
631     /**
632      * Verify that when an AppendEntries is sent to a follower during a snapshot install
633      * the Follower short-circuits the processing of the AppendEntries message.
634      *
635      * @throws Exception
636      */
637     @Test
638     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
639         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
640
641         MockRaftActorContext context = createActorContext();
642
643         follower = createBehavior(context);
644
645         ByteString bsSnapshot  = createSnapshot();
646         int snapshotLength = bsSnapshot.size();
647         int chunkSize = 50;
648         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
649         int lastIncludedIndex = 1;
650
651         // Check that snapshot installation is not in progress
652         assertNull(((Follower) follower).getSnapshotTracker());
653
654         // Make sure that we have more than 1 chunk to send
655         assertTrue(totalChunks > 1);
656
657         // Send an install snapshot with the first chunk to start the process of installing a snapshot
658         ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
659         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
660                 chunkData, 1, totalChunks));
661
662         // Check if snapshot installation is in progress now
663         assertNotNull(((Follower) follower).getSnapshotTracker());
664
665         // Send an append entry
666         AppendEntries appendEntries = mock(AppendEntries.class);
667         doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
668
669         follower.handleMessage(leaderActor, appendEntries);
670
671         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
672         assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
673         assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
674         assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
675
676         // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
677         verify(appendEntries, never()).getPrevLogIndex();
678
679     }
680
681     @Test
682     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
683         logStart("testInitialSyncUpWithHandleInstallSnapshot");
684
685         MockRaftActorContext context = createActorContext();
686
687         follower = createBehavior(context);
688
689         ByteString bsSnapshot  = createSnapshot();
690         int offset = 0;
691         int snapshotLength = bsSnapshot.size();
692         int chunkSize = 50;
693         int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
694         int lastIncludedIndex = 1;
695         int chunkIndex = 1;
696         InstallSnapshot lastInstallSnapshot = null;
697
698         for(int i = 0; i < totalChunks; i++) {
699             ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
700             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
701                     chunkData, chunkIndex, totalChunks);
702             follower.handleMessage(leaderActor, lastInstallSnapshot);
703             offset = offset + 50;
704             lastIncludedIndex++;
705             chunkIndex++;
706         }
707
708         FollowerInitialSyncUpStatus syncStatus =
709                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
710
711         assertFalse(syncStatus.isInitialSyncDone());
712
713         // Clear all the messages
714         followerActor.underlyingActor().clear();
715
716         context.setLastApplied(101);
717         context.setCommitIndex(101);
718         setLastLogEntry(context, 1, 101,
719                 new MockRaftActorContext.MockPayload(""));
720
721         List<ReplicatedLogEntry> entries = Arrays.asList(
722                 newReplicatedLogEntry(2, 101, "foo"));
723
724         // The new commitIndex is 101
725         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
726         follower.handleMessage(leaderActor, appendEntries);
727
728         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
729
730         assertTrue(syncStatus.isInitialSyncDone());
731     }
732
733     @Test
734     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
735         logStart("testHandleOutOfSequenceInstallSnapshot");
736
737         MockRaftActorContext context = createActorContext();
738
739         follower = createBehavior(context);
740
741         ByteString bsSnapshot = createSnapshot();
742
743         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
744                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
745         follower.handleMessage(leaderActor, installSnapshot);
746
747         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
748                 InstallSnapshotReply.class);
749
750         assertEquals("isSuccess", false, reply.isSuccess());
751         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
752         assertEquals("getTerm", 1, reply.getTerm());
753         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
754
755         assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
756     }
757
758     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
759         int snapshotLength = bs.size();
760         int start = offset;
761         int size = chunkSize;
762         if (chunkSize > snapshotLength) {
763             size = snapshotLength;
764         } else {
765             if ((start + chunkSize) > snapshotLength) {
766                 size = snapshotLength - start;
767             }
768         }
769         return bs.substring(start, start + size);
770     }
771
772     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
773             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
774
775         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
776                 AppendEntriesReply.class);
777
778         assertEquals("isSuccess", expSuccess, reply.isSuccess());
779         assertEquals("getTerm", expTerm, reply.getTerm());
780         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
781         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
782         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
783     }
784
785     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
786         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
787                 new MockRaftActorContext.MockPayload(data));
788     }
789
790     private ByteString createSnapshot(){
791         HashMap<String, String> followerSnapshot = new HashMap<>();
792         followerSnapshot.put("1", "A");
793         followerSnapshot.put("2", "B");
794         followerSnapshot.put("3", "C");
795
796         return toByteString(followerSnapshot);
797     }
798
799     @Override
800     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
801             ActorRef actorRef, RaftRPC rpc) throws Exception {
802         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
803
804         String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
805         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
806     }
807
808     @Override
809     protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
810             throws Exception {
811         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
812         assertEquals("isSuccess", true, reply.isSuccess());
813     }
814 }