Use SnapshotManager
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51
52     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
54
55     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
57
58     private Leader leader;
59
60     @Override
61     @After
62     public void tearDown() throws Exception {
63         if(leader != null) {
64             leader.close();
65         }
66
67         super.tearDown();
68     }
69
70     @Test
71     public void testHandleMessageForUnknownMessage() throws Exception {
72         logStart("testHandleMessageForUnknownMessage");
73
74         leader = new Leader(createActorContext());
75
76         // handle message should return the Leader state when it receives an
77         // unknown message
78         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79         Assert.assertTrue(behavior instanceof Leader);
80     }
81
82     @Test
83     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
85
86         MockRaftActorContext actorContext = createActorContextWithFollower();
87
88         long term = 1;
89         actorContext.getTermInformation().update(term, "");
90
91         leader = new Leader(actorContext);
92
93         // Leader should send an immediate heartbeat with no entries as follower is inactive.
94         long lastIndex = actorContext.getReplicatedLog().lastIndex();
95         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96         assertEquals("getTerm", term, appendEntries.getTerm());
97         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99         assertEquals("Entries size", 0, appendEntries.getEntries().size());
100
101         // The follower would normally reply - simulate that explicitly here.
102         leader.handleMessage(followerActor, new AppendEntriesReply(
103                 FOLLOWER_ID, term, true, lastIndex - 1, term));
104         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
105
106         followerActor.underlyingActor().clear();
107
108         // Sleep for the heartbeat interval so AppendEntries is sent.
109         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
111
112         leader.handleMessage(leaderActor, new SendHeartBeat());
113
114         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117         assertEquals("Entries size", 1, appendEntries.getEntries().size());
118         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
120     }
121
122
123     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
124         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
125         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
126                 1, index, payload);
127         actorContext.getReplicatedLog().append(newEntry);
128         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
129     }
130
131     @Test
132     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
133         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
134
135         MockRaftActorContext actorContext = createActorContextWithFollower();
136
137         long term = 1;
138         actorContext.getTermInformation().update(term, "");
139
140         leader = new Leader(actorContext);
141
142         // Leader will send an immediate heartbeat - ignore it.
143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
144
145         // The follower would normally reply - simulate that explicitly here.
146         long lastIndex = actorContext.getReplicatedLog().lastIndex();
147         leader.handleMessage(followerActor, new AppendEntriesReply(
148                 FOLLOWER_ID, term, true, lastIndex, term));
149         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
150
151         followerActor.underlyingActor().clear();
152
153         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155                 1, lastIndex + 1, payload);
156         actorContext.getReplicatedLog().append(newEntry);
157         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
158
159         // State should not change
160         assertTrue(raftBehavior instanceof Leader);
161
162         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
163         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
164         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
165         assertEquals("Entries size", 1, appendEntries.getEntries().size());
166         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
167         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
168         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
169     }
170
171     @Test
172     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
173         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
174
175         MockRaftActorContext actorContext = createActorContextWithFollower();
176         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
177             @Override
178             public FiniteDuration getHeartBeatInterval() {
179                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
180             }
181         });
182
183         long term = 1;
184         actorContext.getTermInformation().update(term, "");
185
186         leader = new Leader(actorContext);
187
188         // Leader will send an immediate heartbeat - ignore it.
189         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190
191         // The follower would normally reply - simulate that explicitly here.
192         long lastIndex = actorContext.getReplicatedLog().lastIndex();
193         leader.handleMessage(followerActor, new AppendEntriesReply(
194                 FOLLOWER_ID, term, true, lastIndex, term));
195         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
196
197         followerActor.underlyingActor().clear();
198
199         for(int i=0;i<5;i++) {
200             sendReplicate(actorContext, lastIndex+i+1);
201         }
202
203         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
204         // We expect only 1 message to be sent because of two reasons,
205         // - an append entries reply was not received
206         // - the heartbeat interval has not expired
207         // In this scenario if multiple messages are sent they would likely be duplicates
208         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
209     }
210
211     @Test
212     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
213         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
214
215         MockRaftActorContext actorContext = createActorContextWithFollower();
216         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
217             @Override
218             public FiniteDuration getHeartBeatInterval() {
219                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
220             }
221         });
222
223         long term = 1;
224         actorContext.getTermInformation().update(term, "");
225
226         leader = new Leader(actorContext);
227
228         // Leader will send an immediate heartbeat - ignore it.
229         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
230
231         // The follower would normally reply - simulate that explicitly here.
232         long lastIndex = actorContext.getReplicatedLog().lastIndex();
233         leader.handleMessage(followerActor, new AppendEntriesReply(
234                 FOLLOWER_ID, term, true, lastIndex, term));
235         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
236
237         followerActor.underlyingActor().clear();
238
239         for(int i=0;i<3;i++) {
240             sendReplicate(actorContext, lastIndex+i+1);
241             leader.handleMessage(followerActor, new AppendEntriesReply(
242                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
243
244         }
245
246         for(int i=3;i<5;i++) {
247             sendReplicate(actorContext, lastIndex + i + 1);
248         }
249
250         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
251         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
252         // get sent to the follower - but not the 5th
253         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
254
255         for(int i=0;i<4;i++) {
256             long expected = allMessages.get(i).getEntries().get(0).getIndex();
257             assertEquals(expected, i+2);
258         }
259     }
260
261     @Test
262     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
263         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
264
265         MockRaftActorContext actorContext = createActorContextWithFollower();
266         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
267             @Override
268             public FiniteDuration getHeartBeatInterval() {
269                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
270             }
271         });
272
273         long term = 1;
274         actorContext.getTermInformation().update(term, "");
275
276         leader = new Leader(actorContext);
277
278         // Leader will send an immediate heartbeat - ignore it.
279         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
280
281         // The follower would normally reply - simulate that explicitly here.
282         long lastIndex = actorContext.getReplicatedLog().lastIndex();
283         leader.handleMessage(followerActor, new AppendEntriesReply(
284                 FOLLOWER_ID, term, true, lastIndex, term));
285         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
286
287         followerActor.underlyingActor().clear();
288
289         sendReplicate(actorContext, lastIndex+1);
290
291         // Wait slightly longer than heartbeat duration
292         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
293
294         leader.handleMessage(leaderActor, new SendHeartBeat());
295
296         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
297         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
298
299         assertEquals(1, allMessages.get(0).getEntries().size());
300         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
301         assertEquals(1, allMessages.get(1).getEntries().size());
302         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
303
304     }
305
306     @Test
307     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
308         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
309
310         MockRaftActorContext actorContext = createActorContextWithFollower();
311         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312             @Override
313             public FiniteDuration getHeartBeatInterval() {
314                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
315             }
316         });
317
318         long term = 1;
319         actorContext.getTermInformation().update(term, "");
320
321         leader = new Leader(actorContext);
322
323         // Leader will send an immediate heartbeat - ignore it.
324         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325
326         // The follower would normally reply - simulate that explicitly here.
327         long lastIndex = actorContext.getReplicatedLog().lastIndex();
328         leader.handleMessage(followerActor, new AppendEntriesReply(
329                 FOLLOWER_ID, term, true, lastIndex, term));
330         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331
332         followerActor.underlyingActor().clear();
333
334         for(int i=0;i<3;i++) {
335             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
336             leader.handleMessage(leaderActor, new SendHeartBeat());
337         }
338
339         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
340         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
341     }
342
343     @Test
344     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
345         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
346
347         MockRaftActorContext actorContext = createActorContextWithFollower();
348         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
349             @Override
350             public FiniteDuration getHeartBeatInterval() {
351                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
352             }
353         });
354
355         long term = 1;
356         actorContext.getTermInformation().update(term, "");
357
358         leader = new Leader(actorContext);
359
360         // Leader will send an immediate heartbeat - ignore it.
361         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
362
363         // The follower would normally reply - simulate that explicitly here.
364         long lastIndex = actorContext.getReplicatedLog().lastIndex();
365         leader.handleMessage(followerActor, new AppendEntriesReply(
366                 FOLLOWER_ID, term, true, lastIndex, term));
367         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
368
369         followerActor.underlyingActor().clear();
370
371         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
372         leader.handleMessage(leaderActor, new SendHeartBeat());
373         sendReplicate(actorContext, lastIndex+1);
374
375         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
376         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
377
378         assertEquals(0, allMessages.get(0).getEntries().size());
379         assertEquals(1, allMessages.get(1).getEntries().size());
380     }
381
382
383     @Test
384     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
385         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
386
387         MockRaftActorContext actorContext = createActorContext();
388
389         leader = new Leader(actorContext);
390
391         actorContext.setLastApplied(0);
392
393         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
394         long term = actorContext.getTermInformation().getCurrentTerm();
395         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
396                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
397
398         actorContext.getReplicatedLog().append(newEntry);
399
400         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
401                 new Replicate(leaderActor, "state-id", newEntry));
402
403         // State should not change
404         assertTrue(raftBehavior instanceof Leader);
405
406         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
407
408         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
409         // one since lastApplied state is 0.
410         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
411                 leaderActor, ApplyState.class);
412         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
413
414         for(int i = 0; i <= newLogIndex - 1; i++ ) {
415             ApplyState applyState = applyStateList.get(i);
416             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
417             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
418         }
419
420         ApplyState last = applyStateList.get((int) newLogIndex - 1);
421         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
422         assertEquals("getIdentifier", "state-id", last.getIdentifier());
423     }
424
425     @Test
426     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
427         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
428
429         MockRaftActorContext actorContext = createActorContextWithFollower();
430
431         Map<String, String> leadersSnapshot = new HashMap<>();
432         leadersSnapshot.put("1", "A");
433         leadersSnapshot.put("2", "B");
434         leadersSnapshot.put("3", "C");
435
436         //clears leaders log
437         actorContext.getReplicatedLog().removeFrom(0);
438
439         final int followersLastIndex = 2;
440         final int snapshotIndex = 3;
441         final int newEntryIndex = 4;
442         final int snapshotTerm = 1;
443         final int currentTerm = 2;
444
445         // set the snapshot variables in replicatedlog
446         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
447         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
448         actorContext.setCommitIndex(followersLastIndex);
449         //set follower timeout to 2 mins, helps during debugging
450         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
451
452         leader = new Leader(actorContext);
453
454         // new entry
455         ReplicatedLogImplEntry entry =
456                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
457                         new MockRaftActorContext.MockPayload("D"));
458
459         //update follower timestamp
460         leader.markFollowerActive(FOLLOWER_ID);
461
462         ByteString bs = toByteString(leadersSnapshot);
463         leader.setSnapshot(Optional.of(bs));
464         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
465         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
466
467         //send first chunk and no InstallSnapshotReply received yet
468         fts.getNextChunk();
469         fts.incrementChunkIndex();
470
471         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
472                 TimeUnit.MILLISECONDS);
473
474         leader.handleMessage(leaderActor, new SendHeartBeat());
475
476         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
477
478         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
479
480         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
481
482         //InstallSnapshotReply received
483         fts.markSendStatus(true);
484
485         leader.handleMessage(leaderActor, new SendHeartBeat());
486
487         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
488
489         assertEquals(snapshotIndex, is.getLastIncludedIndex());
490     }
491
492     @Test
493     public void testSendAppendEntriesSnapshotScenario() throws Exception {
494         logStart("testSendAppendEntriesSnapshotScenario");
495
496         MockRaftActorContext actorContext = createActorContextWithFollower();
497
498         Map<String, String> leadersSnapshot = new HashMap<>();
499         leadersSnapshot.put("1", "A");
500         leadersSnapshot.put("2", "B");
501         leadersSnapshot.put("3", "C");
502
503         //clears leaders log
504         actorContext.getReplicatedLog().removeFrom(0);
505
506         final int followersLastIndex = 2;
507         final int snapshotIndex = 3;
508         final int newEntryIndex = 4;
509         final int snapshotTerm = 1;
510         final int currentTerm = 2;
511
512         // set the snapshot variables in replicatedlog
513         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
514         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
515         actorContext.setCommitIndex(followersLastIndex);
516
517         leader = new Leader(actorContext);
518
519         // Leader will send an immediate heartbeat - ignore it.
520         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
521
522         // new entry
523         ReplicatedLogImplEntry entry =
524                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
525                         new MockRaftActorContext.MockPayload("D"));
526
527         actorContext.getReplicatedLog().append(entry);
528
529         //update follower timestamp
530         leader.markFollowerActive(FOLLOWER_ID);
531
532         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
533         RaftActorBehavior raftBehavior = leader.handleMessage(
534                 leaderActor, new Replicate(null, "state-id", entry));
535
536         assertTrue(raftBehavior instanceof Leader);
537
538         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
539     }
540
541     @Test
542     public void testInitiateInstallSnapshot() throws Exception {
543         logStart("testInitiateInstallSnapshot");
544
545         MockRaftActorContext actorContext = createActorContextWithFollower();
546
547         Map<String, String> leadersSnapshot = new HashMap<>();
548         leadersSnapshot.put("1", "A");
549         leadersSnapshot.put("2", "B");
550         leadersSnapshot.put("3", "C");
551
552         //clears leaders log
553         actorContext.getReplicatedLog().removeFrom(0);
554
555         final int followersLastIndex = 2;
556         final int snapshotIndex = 3;
557         final int newEntryIndex = 4;
558         final int snapshotTerm = 1;
559         final int currentTerm = 2;
560
561         // set the snapshot variables in replicatedlog
562         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
563         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
564         actorContext.setLastApplied(3);
565         actorContext.setCommitIndex(followersLastIndex);
566
567         leader = new Leader(actorContext);
568
569         // Leader will send an immediate heartbeat - ignore it.
570         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
571
572         // set the snapshot as absent and check if capture-snapshot is invoked.
573         leader.setSnapshot(Optional.<ByteString>absent());
574
575         // new entry
576         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
577                 new MockRaftActorContext.MockPayload("D"));
578
579         actorContext.getReplicatedLog().append(entry);
580
581         //update follower timestamp
582         leader.markFollowerActive(FOLLOWER_ID);
583
584         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
585
586         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
587
588         assertTrue(cs.isInstallSnapshotInitiated());
589         assertEquals(3, cs.getLastAppliedIndex());
590         assertEquals(1, cs.getLastAppliedTerm());
591         assertEquals(4, cs.getLastIndex());
592         assertEquals(2, cs.getLastTerm());
593
594         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
595         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
596
597         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
598         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
599     }
600
601     @Test
602     public void testInstallSnapshot() throws Exception {
603         logStart("testInstallSnapshot");
604
605         MockRaftActorContext actorContext = createActorContextWithFollower();
606
607         Map<String, String> leadersSnapshot = new HashMap<>();
608         leadersSnapshot.put("1", "A");
609         leadersSnapshot.put("2", "B");
610         leadersSnapshot.put("3", "C");
611
612         //clears leaders log
613         actorContext.getReplicatedLog().removeFrom(0);
614
615         final int followersLastIndex = 2;
616         final int snapshotIndex = 3;
617         final int snapshotTerm = 1;
618         final int currentTerm = 2;
619
620         // set the snapshot variables in replicatedlog
621         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
622         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
623         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
624         actorContext.setCommitIndex(followersLastIndex);
625
626         leader = new Leader(actorContext);
627
628         // Ignore initial heartbeat.
629         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
630
631         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
632                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
633
634         assertTrue(raftBehavior instanceof Leader);
635
636         // check if installsnapshot gets called with the correct values.
637
638         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
639
640         assertNotNull(installSnapshot.getData());
641         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
642         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
643
644         assertEquals(currentTerm, installSnapshot.getTerm());
645     }
646
647     @Test
648     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
649         logStart("testHandleInstallSnapshotReplyLastChunk");
650
651         MockRaftActorContext actorContext = createActorContextWithFollower();
652
653         final int followersLastIndex = 2;
654         final int snapshotIndex = 3;
655         final int snapshotTerm = 1;
656         final int currentTerm = 2;
657
658         actorContext.setCommitIndex(followersLastIndex);
659
660         leader = new Leader(actorContext);
661
662         // Ignore initial heartbeat.
663         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
664
665         Map<String, String> leadersSnapshot = new HashMap<>();
666         leadersSnapshot.put("1", "A");
667         leadersSnapshot.put("2", "B");
668         leadersSnapshot.put("3", "C");
669
670         // set the snapshot variables in replicatedlog
671
672         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
673         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
674         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
675
676         ByteString bs = toByteString(leadersSnapshot);
677         leader.setSnapshot(Optional.of(bs));
678         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
679         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
680         while(!fts.isLastChunk(fts.getChunkIndex())) {
681             fts.getNextChunk();
682             fts.incrementChunkIndex();
683         }
684
685         //clears leaders log
686         actorContext.getReplicatedLog().removeFrom(0);
687
688         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
689                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
690
691         assertTrue(raftBehavior instanceof Leader);
692
693         assertEquals(0, leader.followerSnapshotSize());
694         assertEquals(1, leader.followerLogSize());
695         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
696         assertNotNull(fli);
697         assertEquals(snapshotIndex, fli.getMatchIndex());
698         assertEquals(snapshotIndex, fli.getMatchIndex());
699         assertEquals(snapshotIndex + 1, fli.getNextIndex());
700     }
701
702     @Test
703     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
704         logStart("testSendSnapshotfromInstallSnapshotReply");
705
706         MockRaftActorContext actorContext = createActorContextWithFollower();
707
708         final int followersLastIndex = 2;
709         final int snapshotIndex = 3;
710         final int snapshotTerm = 1;
711         final int currentTerm = 2;
712
713         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
714             @Override
715             public int getSnapshotChunkSize() {
716                 return 50;
717             }
718         };
719         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
720         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
721
722         actorContext.setConfigParams(configParams);
723         actorContext.setCommitIndex(followersLastIndex);
724
725         leader = new Leader(actorContext);
726
727         Map<String, String> leadersSnapshot = new HashMap<>();
728         leadersSnapshot.put("1", "A");
729         leadersSnapshot.put("2", "B");
730         leadersSnapshot.put("3", "C");
731
732         // set the snapshot variables in replicatedlog
733         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
734         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
735         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
736
737         ByteString bs = toByteString(leadersSnapshot);
738         leader.setSnapshot(Optional.of(bs));
739
740         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
741
742         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
743
744         assertEquals(1, installSnapshot.getChunkIndex());
745         assertEquals(3, installSnapshot.getTotalChunks());
746
747         followerActor.underlyingActor().clear();
748         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
749                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
750
751         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
752
753         assertEquals(2, installSnapshot.getChunkIndex());
754         assertEquals(3, installSnapshot.getTotalChunks());
755
756         followerActor.underlyingActor().clear();
757         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
758                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
759
760         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
761
762         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
763         followerActor.underlyingActor().clear();
764         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
765                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
766
767         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
768
769         Assert.assertNull(installSnapshot);
770     }
771
772
773     @Test
774     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
775         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
776
777         MockRaftActorContext actorContext = createActorContextWithFollower();
778
779         final int followersLastIndex = 2;
780         final int snapshotIndex = 3;
781         final int snapshotTerm = 1;
782         final int currentTerm = 2;
783
784         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
785             @Override
786             public int getSnapshotChunkSize() {
787                 return 50;
788             }
789         });
790
791         actorContext.setCommitIndex(followersLastIndex);
792
793         leader = new Leader(actorContext);
794
795         Map<String, String> leadersSnapshot = new HashMap<>();
796         leadersSnapshot.put("1", "A");
797         leadersSnapshot.put("2", "B");
798         leadersSnapshot.put("3", "C");
799
800         // set the snapshot variables in replicatedlog
801         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804
805         ByteString bs = toByteString(leadersSnapshot);
806         leader.setSnapshot(Optional.of(bs));
807
808         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
809         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
810
811         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
812
813         assertEquals(1, installSnapshot.getChunkIndex());
814         assertEquals(3, installSnapshot.getTotalChunks());
815
816         followerActor.underlyingActor().clear();
817
818         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
819                 FOLLOWER_ID, -1, false));
820
821         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
822                 TimeUnit.MILLISECONDS);
823
824         leader.handleMessage(leaderActor, new SendHeartBeat());
825
826         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
827
828         assertEquals(1, installSnapshot.getChunkIndex());
829         assertEquals(3, installSnapshot.getTotalChunks());
830     }
831
832     @Test
833     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
834         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
835
836         MockRaftActorContext actorContext = createActorContextWithFollower();
837
838         final int followersLastIndex = 2;
839         final int snapshotIndex = 3;
840         final int snapshotTerm = 1;
841         final int currentTerm = 2;
842
843         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
844             @Override
845             public int getSnapshotChunkSize() {
846                 return 50;
847             }
848         });
849
850         actorContext.setCommitIndex(followersLastIndex);
851
852         leader = new Leader(actorContext);
853
854         Map<String, String> leadersSnapshot = new HashMap<>();
855         leadersSnapshot.put("1", "A");
856         leadersSnapshot.put("2", "B");
857         leadersSnapshot.put("3", "C");
858
859         // set the snapshot variables in replicatedlog
860         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
861         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
862         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
863
864         ByteString bs = toByteString(leadersSnapshot);
865         leader.setSnapshot(Optional.of(bs));
866
867         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
868
869         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
870
871         assertEquals(1, installSnapshot.getChunkIndex());
872         assertEquals(3, installSnapshot.getTotalChunks());
873         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
874
875         int hashCode = installSnapshot.getData().hashCode();
876
877         followerActor.underlyingActor().clear();
878
879         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
880                 FOLLOWER_ID, 1, true));
881
882         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
883
884         assertEquals(2, installSnapshot.getChunkIndex());
885         assertEquals(3, installSnapshot.getTotalChunks());
886         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
887     }
888
889     @Test
890     public void testFollowerToSnapshotLogic() {
891         logStart("testFollowerToSnapshotLogic");
892
893         MockRaftActorContext actorContext = createActorContext();
894
895         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
896             @Override
897             public int getSnapshotChunkSize() {
898                 return 50;
899             }
900         });
901
902         leader = new Leader(actorContext);
903
904         Map<String, String> leadersSnapshot = new HashMap<>();
905         leadersSnapshot.put("1", "A");
906         leadersSnapshot.put("2", "B");
907         leadersSnapshot.put("3", "C");
908
909         ByteString bs = toByteString(leadersSnapshot);
910         byte[] barray = bs.toByteArray();
911
912         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
913         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
914
915         assertEquals(bs.size(), barray.length);
916
917         int chunkIndex=0;
918         for (int i=0; i < barray.length; i = i + 50) {
919             int j = i + 50;
920             chunkIndex++;
921
922             if (i + 50 > barray.length) {
923                 j = barray.length;
924             }
925
926             ByteString chunk = fts.getNextChunk();
927             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
928             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
929
930             fts.markSendStatus(true);
931             if (!fts.isLastChunk(chunkIndex)) {
932                 fts.incrementChunkIndex();
933             }
934         }
935
936         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
937     }
938
939     @Override protected RaftActorBehavior createBehavior(
940         RaftActorContext actorContext) {
941         return new Leader(actorContext);
942     }
943
944     @Override
945     protected MockRaftActorContext createActorContext() {
946         return createActorContext(leaderActor);
947     }
948
949     @Override
950     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
951         return createActorContext("leader", actorRef);
952     }
953
954     private MockRaftActorContext createActorContextWithFollower() {
955         MockRaftActorContext actorContext = createActorContext();
956         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
957                 followerActor.path().toString()).build());
958         return actorContext;
959     }
960
961     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
962         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
963         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
964         configParams.setElectionTimeoutFactor(100000);
965         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
966         context.setConfigParams(configParams);
967         return context;
968     }
969
970     @Test
971     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
972         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
973
974         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
975
976         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
977
978         Follower follower = new Follower(followerActorContext);
979         followerActor.underlyingActor().setBehavior(follower);
980
981         Map<String, String> peerAddresses = new HashMap<>();
982         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
983
984         leaderActorContext.setPeerAddresses(peerAddresses);
985
986         leaderActorContext.getReplicatedLog().removeFrom(0);
987
988         //create 3 entries
989         leaderActorContext.setReplicatedLog(
990                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
991
992         leaderActorContext.setCommitIndex(1);
993
994         followerActorContext.getReplicatedLog().removeFrom(0);
995
996         // follower too has the exact same log entries and has the same commit index
997         followerActorContext.setReplicatedLog(
998                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
999
1000         followerActorContext.setCommitIndex(1);
1001
1002         leader = new Leader(leaderActorContext);
1003
1004         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1005
1006         assertEquals(1, appendEntries.getLeaderCommit());
1007         assertEquals(0, appendEntries.getEntries().size());
1008         assertEquals(0, appendEntries.getPrevLogIndex());
1009
1010         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1011                 leaderActor, AppendEntriesReply.class);
1012
1013         assertEquals(2, appendEntriesReply.getLogLastIndex());
1014         assertEquals(1, appendEntriesReply.getLogLastTerm());
1015
1016         // follower returns its next index
1017         assertEquals(2, appendEntriesReply.getLogLastIndex());
1018         assertEquals(1, appendEntriesReply.getLogLastTerm());
1019
1020         follower.close();
1021     }
1022
1023     @Test
1024     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1025         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1026
1027         MockRaftActorContext leaderActorContext = createActorContext();
1028
1029         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1030
1031         Follower follower = new Follower(followerActorContext);
1032         followerActor.underlyingActor().setBehavior(follower);
1033
1034         Map<String, String> peerAddresses = new HashMap<>();
1035         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1036
1037         leaderActorContext.setPeerAddresses(peerAddresses);
1038
1039         leaderActorContext.getReplicatedLog().removeFrom(0);
1040
1041         leaderActorContext.setReplicatedLog(
1042                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1043
1044         leaderActorContext.setCommitIndex(1);
1045
1046         followerActorContext.getReplicatedLog().removeFrom(0);
1047
1048         followerActorContext.setReplicatedLog(
1049                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1050
1051         // follower has the same log entries but its commit index > leaders commit index
1052         followerActorContext.setCommitIndex(2);
1053
1054         leader = new Leader(leaderActorContext);
1055
1056         // Initial heartbeat
1057         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1058
1059         assertEquals(1, appendEntries.getLeaderCommit());
1060         assertEquals(0, appendEntries.getEntries().size());
1061         assertEquals(0, appendEntries.getPrevLogIndex());
1062
1063         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1064                 leaderActor, AppendEntriesReply.class);
1065
1066         assertEquals(2, appendEntriesReply.getLogLastIndex());
1067         assertEquals(1, appendEntriesReply.getLogLastTerm());
1068
1069         leaderActor.underlyingActor().setBehavior(follower);
1070         leader.handleMessage(followerActor, appendEntriesReply);
1071
1072         leaderActor.underlyingActor().clear();
1073         followerActor.underlyingActor().clear();
1074
1075         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076                 TimeUnit.MILLISECONDS);
1077
1078         leader.handleMessage(leaderActor, new SendHeartBeat());
1079
1080         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1081
1082         assertEquals(2, appendEntries.getLeaderCommit());
1083         assertEquals(0, appendEntries.getEntries().size());
1084         assertEquals(2, appendEntries.getPrevLogIndex());
1085
1086         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1087
1088         assertEquals(2, appendEntriesReply.getLogLastIndex());
1089         assertEquals(1, appendEntriesReply.getLogLastTerm());
1090
1091         assertEquals(2, followerActorContext.getCommitIndex());
1092
1093         follower.close();
1094     }
1095
1096     @Test
1097     public void testHandleAppendEntriesReplyFailure(){
1098         logStart("testHandleAppendEntriesReplyFailure");
1099
1100         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1101
1102         leader = new Leader(leaderActorContext);
1103
1104         // Send initial heartbeat reply with last index.
1105         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1106
1107         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1108         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1109
1110         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1111
1112         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1113
1114         assertEquals(RaftState.Leader, raftActorBehavior.state());
1115
1116         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1117     }
1118
1119     @Test
1120     public void testHandleAppendEntriesReplySuccess() throws Exception {
1121         logStart("testHandleAppendEntriesReplySuccess");
1122
1123         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1124
1125         leaderActorContext.setReplicatedLog(
1126                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1127
1128         leaderActorContext.setCommitIndex(1);
1129         leaderActorContext.setLastApplied(1);
1130         leaderActorContext.getTermInformation().update(1, "leader");
1131
1132         leader = new Leader(leaderActorContext);
1133
1134         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1135
1136         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1137
1138         assertEquals(RaftState.Leader, raftActorBehavior.state());
1139
1140         assertEquals(2, leaderActorContext.getCommitIndex());
1141
1142         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1143                 leaderActor, ApplyJournalEntries.class);
1144
1145         assertEquals(2, leaderActorContext.getLastApplied());
1146
1147         assertEquals(2, applyJournalEntries.getToIndex());
1148
1149         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1150                 ApplyState.class);
1151
1152         assertEquals(1,applyStateList.size());
1153
1154         ApplyState applyState = applyStateList.get(0);
1155
1156         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1157     }
1158
1159     @Test
1160     public void testHandleAppendEntriesReplyUnknownFollower(){
1161         logStart("testHandleAppendEntriesReplyUnknownFollower");
1162
1163         MockRaftActorContext leaderActorContext = createActorContext();
1164
1165         leader = new Leader(leaderActorContext);
1166
1167         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1168
1169         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1170
1171         assertEquals(RaftState.Leader, raftActorBehavior.state());
1172     }
1173
1174     @Test
1175     public void testHandleRequestVoteReply(){
1176         logStart("testHandleRequestVoteReply");
1177
1178         MockRaftActorContext leaderActorContext = createActorContext();
1179
1180         leader = new Leader(leaderActorContext);
1181
1182         // Should be a no-op.
1183         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1184                 new RequestVoteReply(1, true));
1185
1186         assertEquals(RaftState.Leader, raftActorBehavior.state());
1187
1188         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1189
1190         assertEquals(RaftState.Leader, raftActorBehavior.state());
1191     }
1192
1193     @Test
1194     public void testIsolatedLeaderCheckNoFollowers() {
1195         logStart("testIsolatedLeaderCheckNoFollowers");
1196
1197         MockRaftActorContext leaderActorContext = createActorContext();
1198
1199         leader = new Leader(leaderActorContext);
1200         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1201         Assert.assertTrue(behavior instanceof Leader);
1202     }
1203
1204     @Test
1205     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1206         logStart("testIsolatedLeaderCheckTwoFollowers");
1207
1208         new JavaTestKit(getSystem()) {{
1209
1210             ActorRef followerActor1 = getTestActor();
1211             ActorRef followerActor2 = getTestActor();
1212
1213             MockRaftActorContext leaderActorContext = createActorContext();
1214
1215             Map<String, String> peerAddresses = new HashMap<>();
1216             peerAddresses.put("follower-1", followerActor1.path().toString());
1217             peerAddresses.put("follower-2", followerActor2.path().toString());
1218
1219             leaderActorContext.setPeerAddresses(peerAddresses);
1220
1221             leader = new Leader(leaderActorContext);
1222
1223             leader.markFollowerActive("follower-1");
1224             leader.markFollowerActive("follower-2");
1225             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1226             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1227                 behavior instanceof Leader);
1228
1229             // kill 1 follower and verify if that got killed
1230             final JavaTestKit probe = new JavaTestKit(getSystem());
1231             probe.watch(followerActor1);
1232             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1233             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1234             assertEquals(termMsg1.getActor(), followerActor1);
1235
1236             leader.markFollowerInActive("follower-1");
1237             leader.markFollowerActive("follower-2");
1238             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1239             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1240                 behavior instanceof Leader);
1241
1242             // kill 2nd follower and leader should change to Isolated leader
1243             followerActor2.tell(PoisonPill.getInstance(), null);
1244             probe.watch(followerActor2);
1245             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1246             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1247             assertEquals(termMsg2.getActor(), followerActor2);
1248
1249             leader.markFollowerInActive("follower-2");
1250             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1251             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1252                 behavior instanceof IsolatedLeader);
1253         }};
1254     }
1255
1256
1257     @Test
1258     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1259         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1260
1261         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1262
1263         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1264         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1265         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1266
1267         leaderActorContext.setConfigParams(configParams);
1268
1269         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1270
1271         followerActorContext.setConfigParams(configParams);
1272
1273         Follower follower = new Follower(followerActorContext);
1274         followerActor.underlyingActor().setBehavior(follower);
1275
1276         leaderActorContext.getReplicatedLog().removeFrom(0);
1277         leaderActorContext.setCommitIndex(-1);
1278         leaderActorContext.setLastApplied(-1);
1279
1280         followerActorContext.getReplicatedLog().removeFrom(0);
1281         followerActorContext.setCommitIndex(-1);
1282         followerActorContext.setLastApplied(-1);
1283
1284         leader = new Leader(leaderActorContext);
1285
1286         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1287                 leaderActor, AppendEntriesReply.class);
1288
1289         leader.handleMessage(followerActor, appendEntriesReply);
1290
1291         // Clear initial heartbeat messages
1292
1293         leaderActor.underlyingActor().clear();
1294         followerActor.underlyingActor().clear();
1295
1296         // create 3 entries
1297         leaderActorContext.setReplicatedLog(
1298                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1299         leaderActorContext.setCommitIndex(1);
1300         leaderActorContext.setLastApplied(1);
1301
1302         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1303                 TimeUnit.MILLISECONDS);
1304
1305         leader.handleMessage(leaderActor, new SendHeartBeat());
1306
1307         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1308
1309         // Should send first log entry
1310         assertEquals(1, appendEntries.getLeaderCommit());
1311         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1312         assertEquals(-1, appendEntries.getPrevLogIndex());
1313
1314         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1315
1316         assertEquals(1, appendEntriesReply.getLogLastTerm());
1317         assertEquals(0, appendEntriesReply.getLogLastIndex());
1318
1319         followerActor.underlyingActor().clear();
1320
1321         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1322
1323         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1324
1325         // Should send second log entry
1326         assertEquals(1, appendEntries.getLeaderCommit());
1327         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1328
1329         follower.close();
1330     }
1331
1332     @Test
1333     public void testLaggingFollowerStarvation() throws Exception {
1334         logStart("testLaggingFollowerStarvation");
1335         new JavaTestKit(getSystem()) {{
1336             String leaderActorId = actorFactory.generateActorId("leader");
1337             String follower1ActorId = actorFactory.generateActorId("follower");
1338             String follower2ActorId = actorFactory.generateActorId("follower");
1339
1340             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1341                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1342             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1343             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1344
1345             MockRaftActorContext leaderActorContext =
1346                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1347
1348             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1349             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1350             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1351
1352             leaderActorContext.setConfigParams(configParams);
1353
1354             leaderActorContext.setReplicatedLog(
1355                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1356
1357             Map<String, String> peerAddresses = new HashMap<>();
1358             peerAddresses.put(follower1ActorId,
1359                     follower1Actor.path().toString());
1360             peerAddresses.put(follower2ActorId,
1361                     follower2Actor.path().toString());
1362
1363             leaderActorContext.setPeerAddresses(peerAddresses);
1364             leaderActorContext.getTermInformation().update(1, leaderActorId);
1365
1366             RaftActorBehavior leader = createBehavior(leaderActorContext);
1367
1368             leaderActor.underlyingActor().setBehavior(leader);
1369
1370             for(int i=1;i<6;i++) {
1371                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1372                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1373                 assertTrue(newBehavior == leader);
1374                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1375             }
1376
1377             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1378             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1379
1380             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1381                     heartbeats.size() > 1);
1382
1383             // Check if follower-2 got AppendEntries during this time and was not starved
1384             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1385
1386             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1387                     appendEntries.size() > 1);
1388
1389         }};
1390     }
1391
1392     @Override
1393     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1394             ActorRef actorRef, RaftRPC rpc) throws Exception {
1395         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1396         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1397     }
1398
1399     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1400
1401         private final long electionTimeOutIntervalMillis;
1402         private final int snapshotChunkSize;
1403
1404         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1405             super();
1406             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1407             this.snapshotChunkSize = snapshotChunkSize;
1408         }
1409
1410         @Override
1411         public FiniteDuration getElectionTimeOutInterval() {
1412             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1413         }
1414
1415         @Override
1416         public int getSnapshotChunkSize() {
1417             return snapshotChunkSize;
1418         }
1419     }
1420 }