383ebefd36685f6298524ef87353281c325d1abd
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51
52     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
54
55     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
57
58     private Leader leader;
59
60     @Override
61     @After
62     public void tearDown() throws Exception {
63         if(leader != null) {
64             leader.close();
65         }
66
67         super.tearDown();
68     }
69
70     @Test
71     public void testHandleMessageForUnknownMessage() throws Exception {
72         logStart("testHandleMessageForUnknownMessage");
73
74         leader = new Leader(createActorContext());
75
76         // handle message should return the Leader state when it receives an
77         // unknown message
78         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79         Assert.assertTrue(behavior instanceof Leader);
80     }
81
82     @Test
83     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
85
86         MockRaftActorContext actorContext = createActorContextWithFollower();
87
88         long term = 1;
89         actorContext.getTermInformation().update(term, "");
90
91         leader = new Leader(actorContext);
92
93         // Leader should send an immediate heartbeat with no entries as follower is inactive.
94         long lastIndex = actorContext.getReplicatedLog().lastIndex();
95         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96         assertEquals("getTerm", term, appendEntries.getTerm());
97         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99         assertEquals("Entries size", 0, appendEntries.getEntries().size());
100
101         // The follower would normally reply - simulate that explicitly here.
102         leader.handleMessage(followerActor, new AppendEntriesReply(
103                 FOLLOWER_ID, term, true, lastIndex - 1, term));
104         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
105
106         followerActor.underlyingActor().clear();
107
108         // Sleep for the heartbeat interval so AppendEntries is sent.
109         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
111
112         leader.handleMessage(leaderActor, new SendHeartBeat());
113
114         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117         assertEquals("Entries size", 1, appendEntries.getEntries().size());
118         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
120     }
121
122
123     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
124         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
125         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
126                 1, index, payload);
127         actorContext.getReplicatedLog().append(newEntry);
128         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
129     }
130
131     @Test
132     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
133         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
134
135         MockRaftActorContext actorContext = createActorContextWithFollower();
136
137         long term = 1;
138         actorContext.getTermInformation().update(term, "");
139
140         leader = new Leader(actorContext);
141
142         // Leader will send an immediate heartbeat - ignore it.
143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
144
145         // The follower would normally reply - simulate that explicitly here.
146         long lastIndex = actorContext.getReplicatedLog().lastIndex();
147         leader.handleMessage(followerActor, new AppendEntriesReply(
148                 FOLLOWER_ID, term, true, lastIndex, term));
149         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
150
151         followerActor.underlyingActor().clear();
152
153         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155                 1, lastIndex + 1, payload);
156         actorContext.getReplicatedLog().append(newEntry);
157         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
158
159         // State should not change
160         assertTrue(raftBehavior instanceof Leader);
161
162         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
163         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
164         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
165         assertEquals("Entries size", 1, appendEntries.getEntries().size());
166         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
167         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
168         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
169     }
170
171     @Test
172     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
173         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
174
175         MockRaftActorContext actorContext = createActorContextWithFollower();
176         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
177             @Override
178             public FiniteDuration getHeartBeatInterval() {
179                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
180             }
181         });
182
183         long term = 1;
184         actorContext.getTermInformation().update(term, "");
185
186         leader = new Leader(actorContext);
187
188         // Leader will send an immediate heartbeat - ignore it.
189         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190
191         // The follower would normally reply - simulate that explicitly here.
192         long lastIndex = actorContext.getReplicatedLog().lastIndex();
193         leader.handleMessage(followerActor, new AppendEntriesReply(
194                 FOLLOWER_ID, term, true, lastIndex, term));
195         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
196
197         followerActor.underlyingActor().clear();
198
199         for(int i=0;i<5;i++) {
200             sendReplicate(actorContext, lastIndex+i+1);
201         }
202
203         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
204         // We expect only 1 message to be sent because of two reasons,
205         // - an append entries reply was not received
206         // - the heartbeat interval has not expired
207         // In this scenario if multiple messages are sent they would likely be duplicates
208         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
209     }
210
211     @Test
212     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
213         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
214
215         MockRaftActorContext actorContext = createActorContextWithFollower();
216         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
217             @Override
218             public FiniteDuration getHeartBeatInterval() {
219                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
220             }
221         });
222
223         long term = 1;
224         actorContext.getTermInformation().update(term, "");
225
226         leader = new Leader(actorContext);
227
228         // Leader will send an immediate heartbeat - ignore it.
229         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
230
231         // The follower would normally reply - simulate that explicitly here.
232         long lastIndex = actorContext.getReplicatedLog().lastIndex();
233         leader.handleMessage(followerActor, new AppendEntriesReply(
234                 FOLLOWER_ID, term, true, lastIndex, term));
235         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
236
237         followerActor.underlyingActor().clear();
238
239         for(int i=0;i<3;i++) {
240             sendReplicate(actorContext, lastIndex+i+1);
241             leader.handleMessage(followerActor, new AppendEntriesReply(
242                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
243
244         }
245
246         for(int i=3;i<5;i++) {
247             sendReplicate(actorContext, lastIndex + i + 1);
248         }
249
250         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
251         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
252         // get sent to the follower - but not the 5th
253         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
254
255         for(int i=0;i<4;i++) {
256             long expected = allMessages.get(i).getEntries().get(0).getIndex();
257             assertEquals(expected, i+2);
258         }
259     }
260
261     @Test
262     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
263         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
264
265         MockRaftActorContext actorContext = createActorContextWithFollower();
266         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
267             @Override
268             public FiniteDuration getHeartBeatInterval() {
269                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
270             }
271         });
272
273         long term = 1;
274         actorContext.getTermInformation().update(term, "");
275
276         leader = new Leader(actorContext);
277
278         // Leader will send an immediate heartbeat - ignore it.
279         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
280
281         // The follower would normally reply - simulate that explicitly here.
282         long lastIndex = actorContext.getReplicatedLog().lastIndex();
283         leader.handleMessage(followerActor, new AppendEntriesReply(
284                 FOLLOWER_ID, term, true, lastIndex, term));
285         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
286
287         followerActor.underlyingActor().clear();
288
289         sendReplicate(actorContext, lastIndex+1);
290
291         // Wait slightly longer than heartbeat duration
292         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
293
294         leader.handleMessage(leaderActor, new SendHeartBeat());
295
296         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
297         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
298
299         assertEquals(1, allMessages.get(0).getEntries().size());
300         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
301         assertEquals(1, allMessages.get(1).getEntries().size());
302         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
303
304     }
305
306     @Test
307     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
308         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
309
310         MockRaftActorContext actorContext = createActorContextWithFollower();
311         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312             @Override
313             public FiniteDuration getHeartBeatInterval() {
314                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
315             }
316         });
317
318         long term = 1;
319         actorContext.getTermInformation().update(term, "");
320
321         leader = new Leader(actorContext);
322
323         // Leader will send an immediate heartbeat - ignore it.
324         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325
326         // The follower would normally reply - simulate that explicitly here.
327         long lastIndex = actorContext.getReplicatedLog().lastIndex();
328         leader.handleMessage(followerActor, new AppendEntriesReply(
329                 FOLLOWER_ID, term, true, lastIndex, term));
330         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331
332         followerActor.underlyingActor().clear();
333
334         for(int i=0;i<3;i++) {
335             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
336             leader.handleMessage(leaderActor, new SendHeartBeat());
337         }
338
339         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
340         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
341     }
342
343     @Test
344     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
345         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
346
347         MockRaftActorContext actorContext = createActorContextWithFollower();
348         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
349             @Override
350             public FiniteDuration getHeartBeatInterval() {
351                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
352             }
353         });
354
355         long term = 1;
356         actorContext.getTermInformation().update(term, "");
357
358         leader = new Leader(actorContext);
359
360         // Leader will send an immediate heartbeat - ignore it.
361         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
362
363         // The follower would normally reply - simulate that explicitly here.
364         long lastIndex = actorContext.getReplicatedLog().lastIndex();
365         leader.handleMessage(followerActor, new AppendEntriesReply(
366                 FOLLOWER_ID, term, true, lastIndex, term));
367         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
368
369         followerActor.underlyingActor().clear();
370
371         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
372         leader.handleMessage(leaderActor, new SendHeartBeat());
373         sendReplicate(actorContext, lastIndex+1);
374
375         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
376         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
377
378         assertEquals(0, allMessages.get(0).getEntries().size());
379         assertEquals(1, allMessages.get(1).getEntries().size());
380     }
381
382
383     @Test
384     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
385         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
386
387         MockRaftActorContext actorContext = createActorContext();
388
389         leader = new Leader(actorContext);
390
391         actorContext.setLastApplied(0);
392
393         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
394         long term = actorContext.getTermInformation().getCurrentTerm();
395         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
396                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
397
398         actorContext.getReplicatedLog().append(newEntry);
399
400         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
401                 new Replicate(leaderActor, "state-id", newEntry));
402
403         // State should not change
404         assertTrue(raftBehavior instanceof Leader);
405
406         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
407
408         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
409         // one since lastApplied state is 0.
410         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
411                 leaderActor, ApplyState.class);
412         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
413
414         for(int i = 0; i <= newLogIndex - 1; i++ ) {
415             ApplyState applyState = applyStateList.get(i);
416             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
417             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
418         }
419
420         ApplyState last = applyStateList.get((int) newLogIndex - 1);
421         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
422         assertEquals("getIdentifier", "state-id", last.getIdentifier());
423     }
424
425     @Test
426     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
427         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
428
429         MockRaftActorContext actorContext = createActorContextWithFollower();
430
431         Map<String, String> leadersSnapshot = new HashMap<>();
432         leadersSnapshot.put("1", "A");
433         leadersSnapshot.put("2", "B");
434         leadersSnapshot.put("3", "C");
435
436         //clears leaders log
437         actorContext.getReplicatedLog().removeFrom(0);
438
439         final int followersLastIndex = 2;
440         final int snapshotIndex = 3;
441         final int newEntryIndex = 4;
442         final int snapshotTerm = 1;
443         final int currentTerm = 2;
444
445         // set the snapshot variables in replicatedlog
446         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
447         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
448         actorContext.setCommitIndex(followersLastIndex);
449         //set follower timeout to 2 mins, helps during debugging
450         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
451
452         leader = new Leader(actorContext);
453
454         // new entry
455         ReplicatedLogImplEntry entry =
456                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
457                         new MockRaftActorContext.MockPayload("D"));
458
459         //update follower timestamp
460         leader.markFollowerActive(FOLLOWER_ID);
461
462         ByteString bs = toByteString(leadersSnapshot);
463         leader.setSnapshot(Optional.of(bs));
464         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
465         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
466
467         //send first chunk and no InstallSnapshotReply received yet
468         fts.getNextChunk();
469         fts.incrementChunkIndex();
470
471         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
472                 TimeUnit.MILLISECONDS);
473
474         leader.handleMessage(leaderActor, new SendHeartBeat());
475
476         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
477
478         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
479
480         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
481
482         //InstallSnapshotReply received
483         fts.markSendStatus(true);
484
485         leader.handleMessage(leaderActor, new SendHeartBeat());
486
487         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
488
489         assertEquals(snapshotIndex, is.getLastIncludedIndex());
490     }
491
492     @Test
493     public void testSendAppendEntriesSnapshotScenario() throws Exception {
494         logStart("testSendAppendEntriesSnapshotScenario");
495
496         MockRaftActorContext actorContext = createActorContextWithFollower();
497
498         Map<String, String> leadersSnapshot = new HashMap<>();
499         leadersSnapshot.put("1", "A");
500         leadersSnapshot.put("2", "B");
501         leadersSnapshot.put("3", "C");
502
503         //clears leaders log
504         actorContext.getReplicatedLog().removeFrom(0);
505
506         final int followersLastIndex = 2;
507         final int snapshotIndex = 3;
508         final int newEntryIndex = 4;
509         final int snapshotTerm = 1;
510         final int currentTerm = 2;
511
512         // set the snapshot variables in replicatedlog
513         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
514         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
515         actorContext.setCommitIndex(followersLastIndex);
516
517         leader = new Leader(actorContext);
518
519         // Leader will send an immediate heartbeat - ignore it.
520         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
521
522         // new entry
523         ReplicatedLogImplEntry entry =
524                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
525                         new MockRaftActorContext.MockPayload("D"));
526
527         //update follower timestamp
528         leader.markFollowerActive(FOLLOWER_ID);
529
530         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
531         RaftActorBehavior raftBehavior = leader.handleMessage(
532                 leaderActor, new Replicate(null, "state-id", entry));
533
534         assertTrue(raftBehavior instanceof Leader);
535
536         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
537     }
538
539     @Test
540     public void testInitiateInstallSnapshot() throws Exception {
541         logStart("testInitiateInstallSnapshot");
542
543         MockRaftActorContext actorContext = createActorContextWithFollower();
544
545         Map<String, String> leadersSnapshot = new HashMap<>();
546         leadersSnapshot.put("1", "A");
547         leadersSnapshot.put("2", "B");
548         leadersSnapshot.put("3", "C");
549
550         //clears leaders log
551         actorContext.getReplicatedLog().removeFrom(0);
552
553         final int followersLastIndex = 2;
554         final int snapshotIndex = 3;
555         final int newEntryIndex = 4;
556         final int snapshotTerm = 1;
557         final int currentTerm = 2;
558
559         // set the snapshot variables in replicatedlog
560         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
561         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
562         actorContext.setLastApplied(3);
563         actorContext.setCommitIndex(followersLastIndex);
564
565         leader = new Leader(actorContext);
566
567         // Leader will send an immediate heartbeat - ignore it.
568         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
569
570         // set the snapshot as absent and check if capture-snapshot is invoked.
571         leader.setSnapshot(Optional.<ByteString>absent());
572
573         // new entry
574         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
575                 new MockRaftActorContext.MockPayload("D"));
576
577         actorContext.getReplicatedLog().append(entry);
578
579         //update follower timestamp
580         leader.markFollowerActive(FOLLOWER_ID);
581
582         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
583
584         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
585
586         assertTrue(cs.isInstallSnapshotInitiated());
587         assertEquals(3, cs.getLastAppliedIndex());
588         assertEquals(1, cs.getLastAppliedTerm());
589         assertEquals(4, cs.getLastIndex());
590         assertEquals(2, cs.getLastTerm());
591
592         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
593         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
594
595         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
596         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
597     }
598
599     @Test
600     public void testInstallSnapshot() throws Exception {
601         logStart("testInstallSnapshot");
602
603         MockRaftActorContext actorContext = createActorContextWithFollower();
604
605         Map<String, String> leadersSnapshot = new HashMap<>();
606         leadersSnapshot.put("1", "A");
607         leadersSnapshot.put("2", "B");
608         leadersSnapshot.put("3", "C");
609
610         //clears leaders log
611         actorContext.getReplicatedLog().removeFrom(0);
612
613         final int followersLastIndex = 2;
614         final int snapshotIndex = 3;
615         final int snapshotTerm = 1;
616         final int currentTerm = 2;
617
618         // set the snapshot variables in replicatedlog
619         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
620         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
621         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
622         actorContext.setCommitIndex(followersLastIndex);
623
624         leader = new Leader(actorContext);
625
626         // Ignore initial heartbeat.
627         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
628
629         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
630                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
631
632         assertTrue(raftBehavior instanceof Leader);
633
634         // check if installsnapshot gets called with the correct values.
635
636         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
637
638         assertNotNull(installSnapshot.getData());
639         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
640         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
641
642         assertEquals(currentTerm, installSnapshot.getTerm());
643     }
644
645     @Test
646     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
647         logStart("testHandleInstallSnapshotReplyLastChunk");
648
649         MockRaftActorContext actorContext = createActorContextWithFollower();
650
651         final int followersLastIndex = 2;
652         final int snapshotIndex = 3;
653         final int snapshotTerm = 1;
654         final int currentTerm = 2;
655
656         actorContext.setCommitIndex(followersLastIndex);
657
658         leader = new Leader(actorContext);
659
660         // Ignore initial heartbeat.
661         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
662
663         Map<String, String> leadersSnapshot = new HashMap<>();
664         leadersSnapshot.put("1", "A");
665         leadersSnapshot.put("2", "B");
666         leadersSnapshot.put("3", "C");
667
668         // set the snapshot variables in replicatedlog
669
670         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
671         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
672         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
673
674         ByteString bs = toByteString(leadersSnapshot);
675         leader.setSnapshot(Optional.of(bs));
676         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
677         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
678         while(!fts.isLastChunk(fts.getChunkIndex())) {
679             fts.getNextChunk();
680             fts.incrementChunkIndex();
681         }
682
683         //clears leaders log
684         actorContext.getReplicatedLog().removeFrom(0);
685
686         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
687                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
688
689         assertTrue(raftBehavior instanceof Leader);
690
691         assertEquals(0, leader.followerSnapshotSize());
692         assertEquals(1, leader.followerLogSize());
693         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
694         assertNotNull(fli);
695         assertEquals(snapshotIndex, fli.getMatchIndex());
696         assertEquals(snapshotIndex, fli.getMatchIndex());
697         assertEquals(snapshotIndex + 1, fli.getNextIndex());
698     }
699
700     @Test
701     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
702         logStart("testSendSnapshotfromInstallSnapshotReply");
703
704         MockRaftActorContext actorContext = createActorContextWithFollower();
705
706         final int followersLastIndex = 2;
707         final int snapshotIndex = 3;
708         final int snapshotTerm = 1;
709         final int currentTerm = 2;
710
711         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
712             @Override
713             public int getSnapshotChunkSize() {
714                 return 50;
715             }
716         };
717         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
718         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
719
720         actorContext.setConfigParams(configParams);
721         actorContext.setCommitIndex(followersLastIndex);
722
723         leader = new Leader(actorContext);
724
725         Map<String, String> leadersSnapshot = new HashMap<>();
726         leadersSnapshot.put("1", "A");
727         leadersSnapshot.put("2", "B");
728         leadersSnapshot.put("3", "C");
729
730         // set the snapshot variables in replicatedlog
731         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
734
735         ByteString bs = toByteString(leadersSnapshot);
736         leader.setSnapshot(Optional.of(bs));
737
738         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
739
740         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
741
742         assertEquals(1, installSnapshot.getChunkIndex());
743         assertEquals(3, installSnapshot.getTotalChunks());
744
745         followerActor.underlyingActor().clear();
746         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
747                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
748
749         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
750
751         assertEquals(2, installSnapshot.getChunkIndex());
752         assertEquals(3, installSnapshot.getTotalChunks());
753
754         followerActor.underlyingActor().clear();
755         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
756                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
757
758         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
759
760         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
761         followerActor.underlyingActor().clear();
762         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
763                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
764
765         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
766
767         Assert.assertNull(installSnapshot);
768     }
769
770
771     @Test
772     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
773         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
774
775         MockRaftActorContext actorContext = createActorContextWithFollower();
776
777         final int followersLastIndex = 2;
778         final int snapshotIndex = 3;
779         final int snapshotTerm = 1;
780         final int currentTerm = 2;
781
782         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
783             @Override
784             public int getSnapshotChunkSize() {
785                 return 50;
786             }
787         });
788
789         actorContext.setCommitIndex(followersLastIndex);
790
791         leader = new Leader(actorContext);
792
793         Map<String, String> leadersSnapshot = new HashMap<>();
794         leadersSnapshot.put("1", "A");
795         leadersSnapshot.put("2", "B");
796         leadersSnapshot.put("3", "C");
797
798         // set the snapshot variables in replicatedlog
799         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
800         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
801         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
802
803         ByteString bs = toByteString(leadersSnapshot);
804         leader.setSnapshot(Optional.of(bs));
805
806         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
807         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
808
809         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
810
811         assertEquals(1, installSnapshot.getChunkIndex());
812         assertEquals(3, installSnapshot.getTotalChunks());
813
814         followerActor.underlyingActor().clear();
815
816         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
817                 FOLLOWER_ID, -1, false));
818
819         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
820                 TimeUnit.MILLISECONDS);
821
822         leader.handleMessage(leaderActor, new SendHeartBeat());
823
824         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825
826         assertEquals(1, installSnapshot.getChunkIndex());
827         assertEquals(3, installSnapshot.getTotalChunks());
828     }
829
830     @Test
831     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
832         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
833
834         MockRaftActorContext actorContext = createActorContextWithFollower();
835
836         final int followersLastIndex = 2;
837         final int snapshotIndex = 3;
838         final int snapshotTerm = 1;
839         final int currentTerm = 2;
840
841         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
842             @Override
843             public int getSnapshotChunkSize() {
844                 return 50;
845             }
846         });
847
848         actorContext.setCommitIndex(followersLastIndex);
849
850         leader = new Leader(actorContext);
851
852         Map<String, String> leadersSnapshot = new HashMap<>();
853         leadersSnapshot.put("1", "A");
854         leadersSnapshot.put("2", "B");
855         leadersSnapshot.put("3", "C");
856
857         // set the snapshot variables in replicatedlog
858         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
859         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
860         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
861
862         ByteString bs = toByteString(leadersSnapshot);
863         leader.setSnapshot(Optional.of(bs));
864
865         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
866
867         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
868
869         assertEquals(1, installSnapshot.getChunkIndex());
870         assertEquals(3, installSnapshot.getTotalChunks());
871         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
872
873         int hashCode = installSnapshot.getData().hashCode();
874
875         followerActor.underlyingActor().clear();
876
877         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
878                 FOLLOWER_ID, 1, true));
879
880         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
881
882         assertEquals(2, installSnapshot.getChunkIndex());
883         assertEquals(3, installSnapshot.getTotalChunks());
884         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
885     }
886
887     @Test
888     public void testFollowerToSnapshotLogic() {
889         logStart("testFollowerToSnapshotLogic");
890
891         MockRaftActorContext actorContext = createActorContext();
892
893         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
894             @Override
895             public int getSnapshotChunkSize() {
896                 return 50;
897             }
898         });
899
900         leader = new Leader(actorContext);
901
902         Map<String, String> leadersSnapshot = new HashMap<>();
903         leadersSnapshot.put("1", "A");
904         leadersSnapshot.put("2", "B");
905         leadersSnapshot.put("3", "C");
906
907         ByteString bs = toByteString(leadersSnapshot);
908         byte[] barray = bs.toByteArray();
909
910         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
911         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
912
913         assertEquals(bs.size(), barray.length);
914
915         int chunkIndex=0;
916         for (int i=0; i < barray.length; i = i + 50) {
917             int j = i + 50;
918             chunkIndex++;
919
920             if (i + 50 > barray.length) {
921                 j = barray.length;
922             }
923
924             ByteString chunk = fts.getNextChunk();
925             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
926             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
927
928             fts.markSendStatus(true);
929             if (!fts.isLastChunk(chunkIndex)) {
930                 fts.incrementChunkIndex();
931             }
932         }
933
934         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
935     }
936
937     @Override protected RaftActorBehavior createBehavior(
938         RaftActorContext actorContext) {
939         return new Leader(actorContext);
940     }
941
942     @Override
943     protected MockRaftActorContext createActorContext() {
944         return createActorContext(leaderActor);
945     }
946
947     @Override
948     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
949         return createActorContext("leader", actorRef);
950     }
951
952     private MockRaftActorContext createActorContextWithFollower() {
953         MockRaftActorContext actorContext = createActorContext();
954         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
955                 followerActor.path().toString()).build());
956         return actorContext;
957     }
958
959     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
960         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
961         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
962         configParams.setElectionTimeoutFactor(100000);
963         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
964         context.setConfigParams(configParams);
965         return context;
966     }
967
968     @Test
969     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
970         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
971
972         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
973
974         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
975
976         Follower follower = new Follower(followerActorContext);
977         followerActor.underlyingActor().setBehavior(follower);
978
979         Map<String, String> peerAddresses = new HashMap<>();
980         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
981
982         leaderActorContext.setPeerAddresses(peerAddresses);
983
984         leaderActorContext.getReplicatedLog().removeFrom(0);
985
986         //create 3 entries
987         leaderActorContext.setReplicatedLog(
988                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
989
990         leaderActorContext.setCommitIndex(1);
991
992         followerActorContext.getReplicatedLog().removeFrom(0);
993
994         // follower too has the exact same log entries and has the same commit index
995         followerActorContext.setReplicatedLog(
996                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
997
998         followerActorContext.setCommitIndex(1);
999
1000         leader = new Leader(leaderActorContext);
1001
1002         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1003
1004         assertEquals(1, appendEntries.getLeaderCommit());
1005         assertEquals(0, appendEntries.getEntries().size());
1006         assertEquals(0, appendEntries.getPrevLogIndex());
1007
1008         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1009                 leaderActor, AppendEntriesReply.class);
1010
1011         assertEquals(2, appendEntriesReply.getLogLastIndex());
1012         assertEquals(1, appendEntriesReply.getLogLastTerm());
1013
1014         // follower returns its next index
1015         assertEquals(2, appendEntriesReply.getLogLastIndex());
1016         assertEquals(1, appendEntriesReply.getLogLastTerm());
1017
1018         follower.close();
1019     }
1020
1021     @Test
1022     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1023         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1024
1025         MockRaftActorContext leaderActorContext = createActorContext();
1026
1027         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1028
1029         Follower follower = new Follower(followerActorContext);
1030         followerActor.underlyingActor().setBehavior(follower);
1031
1032         Map<String, String> peerAddresses = new HashMap<>();
1033         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1034
1035         leaderActorContext.setPeerAddresses(peerAddresses);
1036
1037         leaderActorContext.getReplicatedLog().removeFrom(0);
1038
1039         leaderActorContext.setReplicatedLog(
1040                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1041
1042         leaderActorContext.setCommitIndex(1);
1043
1044         followerActorContext.getReplicatedLog().removeFrom(0);
1045
1046         followerActorContext.setReplicatedLog(
1047                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1048
1049         // follower has the same log entries but its commit index > leaders commit index
1050         followerActorContext.setCommitIndex(2);
1051
1052         leader = new Leader(leaderActorContext);
1053
1054         // Initial heartbeat
1055         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1056
1057         assertEquals(1, appendEntries.getLeaderCommit());
1058         assertEquals(0, appendEntries.getEntries().size());
1059         assertEquals(0, appendEntries.getPrevLogIndex());
1060
1061         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1062                 leaderActor, AppendEntriesReply.class);
1063
1064         assertEquals(2, appendEntriesReply.getLogLastIndex());
1065         assertEquals(1, appendEntriesReply.getLogLastTerm());
1066
1067         leaderActor.underlyingActor().setBehavior(follower);
1068         leader.handleMessage(followerActor, appendEntriesReply);
1069
1070         leaderActor.underlyingActor().clear();
1071         followerActor.underlyingActor().clear();
1072
1073         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1074                 TimeUnit.MILLISECONDS);
1075
1076         leader.handleMessage(leaderActor, new SendHeartBeat());
1077
1078         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1079
1080         assertEquals(2, appendEntries.getLeaderCommit());
1081         assertEquals(0, appendEntries.getEntries().size());
1082         assertEquals(2, appendEntries.getPrevLogIndex());
1083
1084         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1085
1086         assertEquals(2, appendEntriesReply.getLogLastIndex());
1087         assertEquals(1, appendEntriesReply.getLogLastTerm());
1088
1089         assertEquals(2, followerActorContext.getCommitIndex());
1090
1091         follower.close();
1092     }
1093
1094     @Test
1095     public void testHandleAppendEntriesReplyFailure(){
1096         logStart("testHandleAppendEntriesReplyFailure");
1097
1098         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1099
1100         leader = new Leader(leaderActorContext);
1101
1102         // Send initial heartbeat reply with last index.
1103         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1104
1105         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1106         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1107
1108         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1109
1110         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1111
1112         assertEquals(RaftState.Leader, raftActorBehavior.state());
1113
1114         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1115     }
1116
1117     @Test
1118     public void testHandleAppendEntriesReplySuccess() throws Exception {
1119         logStart("testHandleAppendEntriesReplySuccess");
1120
1121         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1122
1123         leaderActorContext.setReplicatedLog(
1124                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1125
1126         leaderActorContext.setCommitIndex(1);
1127         leaderActorContext.setLastApplied(1);
1128         leaderActorContext.getTermInformation().update(1, "leader");
1129
1130         leader = new Leader(leaderActorContext);
1131
1132         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1133
1134         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1135
1136         assertEquals(RaftState.Leader, raftActorBehavior.state());
1137
1138         assertEquals(2, leaderActorContext.getCommitIndex());
1139
1140         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1141                 leaderActor, ApplyJournalEntries.class);
1142
1143         assertEquals(2, leaderActorContext.getLastApplied());
1144
1145         assertEquals(2, applyJournalEntries.getToIndex());
1146
1147         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1148                 ApplyState.class);
1149
1150         assertEquals(1,applyStateList.size());
1151
1152         ApplyState applyState = applyStateList.get(0);
1153
1154         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1155     }
1156
1157     @Test
1158     public void testHandleAppendEntriesReplyUnknownFollower(){
1159         logStart("testHandleAppendEntriesReplyUnknownFollower");
1160
1161         MockRaftActorContext leaderActorContext = createActorContext();
1162
1163         leader = new Leader(leaderActorContext);
1164
1165         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1166
1167         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1168
1169         assertEquals(RaftState.Leader, raftActorBehavior.state());
1170     }
1171
1172     @Test
1173     public void testHandleRequestVoteReply(){
1174         logStart("testHandleRequestVoteReply");
1175
1176         MockRaftActorContext leaderActorContext = createActorContext();
1177
1178         leader = new Leader(leaderActorContext);
1179
1180         // Should be a no-op.
1181         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1182                 new RequestVoteReply(1, true));
1183
1184         assertEquals(RaftState.Leader, raftActorBehavior.state());
1185
1186         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1187
1188         assertEquals(RaftState.Leader, raftActorBehavior.state());
1189     }
1190
1191     @Test
1192     public void testIsolatedLeaderCheckNoFollowers() {
1193         logStart("testIsolatedLeaderCheckNoFollowers");
1194
1195         MockRaftActorContext leaderActorContext = createActorContext();
1196
1197         leader = new Leader(leaderActorContext);
1198         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1199         Assert.assertTrue(behavior instanceof Leader);
1200     }
1201
1202     @Test
1203     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1204         logStart("testIsolatedLeaderCheckTwoFollowers");
1205
1206         new JavaTestKit(getSystem()) {{
1207
1208             ActorRef followerActor1 = getTestActor();
1209             ActorRef followerActor2 = getTestActor();
1210
1211             MockRaftActorContext leaderActorContext = createActorContext();
1212
1213             Map<String, String> peerAddresses = new HashMap<>();
1214             peerAddresses.put("follower-1", followerActor1.path().toString());
1215             peerAddresses.put("follower-2", followerActor2.path().toString());
1216
1217             leaderActorContext.setPeerAddresses(peerAddresses);
1218
1219             leader = new Leader(leaderActorContext);
1220
1221             leader.markFollowerActive("follower-1");
1222             leader.markFollowerActive("follower-2");
1223             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1224             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1225                 behavior instanceof Leader);
1226
1227             // kill 1 follower and verify if that got killed
1228             final JavaTestKit probe = new JavaTestKit(getSystem());
1229             probe.watch(followerActor1);
1230             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1231             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1232             assertEquals(termMsg1.getActor(), followerActor1);
1233
1234             leader.markFollowerInActive("follower-1");
1235             leader.markFollowerActive("follower-2");
1236             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1237             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1238                 behavior instanceof Leader);
1239
1240             // kill 2nd follower and leader should change to Isolated leader
1241             followerActor2.tell(PoisonPill.getInstance(), null);
1242             probe.watch(followerActor2);
1243             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1244             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1245             assertEquals(termMsg2.getActor(), followerActor2);
1246
1247             leader.markFollowerInActive("follower-2");
1248             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1249             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1250                 behavior instanceof IsolatedLeader);
1251         }};
1252     }
1253
1254
1255     @Test
1256     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1257         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1258
1259         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1260
1261         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1262         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1263         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1264
1265         leaderActorContext.setConfigParams(configParams);
1266
1267         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1268
1269         followerActorContext.setConfigParams(configParams);
1270
1271         Follower follower = new Follower(followerActorContext);
1272         followerActor.underlyingActor().setBehavior(follower);
1273
1274         leaderActorContext.getReplicatedLog().removeFrom(0);
1275         leaderActorContext.setCommitIndex(-1);
1276         leaderActorContext.setLastApplied(-1);
1277
1278         followerActorContext.getReplicatedLog().removeFrom(0);
1279         followerActorContext.setCommitIndex(-1);
1280         followerActorContext.setLastApplied(-1);
1281
1282         leader = new Leader(leaderActorContext);
1283
1284         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1285                 leaderActor, AppendEntriesReply.class);
1286
1287         leader.handleMessage(followerActor, appendEntriesReply);
1288
1289         // Clear initial heartbeat messages
1290
1291         leaderActor.underlyingActor().clear();
1292         followerActor.underlyingActor().clear();
1293
1294         // create 3 entries
1295         leaderActorContext.setReplicatedLog(
1296                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1297         leaderActorContext.setCommitIndex(1);
1298         leaderActorContext.setLastApplied(1);
1299
1300         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1301                 TimeUnit.MILLISECONDS);
1302
1303         leader.handleMessage(leaderActor, new SendHeartBeat());
1304
1305         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1306
1307         // Should send first log entry
1308         assertEquals(1, appendEntries.getLeaderCommit());
1309         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1310         assertEquals(-1, appendEntries.getPrevLogIndex());
1311
1312         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1313
1314         assertEquals(1, appendEntriesReply.getLogLastTerm());
1315         assertEquals(0, appendEntriesReply.getLogLastIndex());
1316
1317         followerActor.underlyingActor().clear();
1318
1319         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1320
1321         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1322
1323         // Should send second log entry
1324         assertEquals(1, appendEntries.getLeaderCommit());
1325         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1326
1327         follower.close();
1328     }
1329
1330     @Test
1331     public void testLaggingFollowerStarvation() throws Exception {
1332         logStart("testLaggingFollowerStarvation");
1333         new JavaTestKit(getSystem()) {{
1334             String leaderActorId = actorFactory.generateActorId("leader");
1335             String follower1ActorId = actorFactory.generateActorId("follower");
1336             String follower2ActorId = actorFactory.generateActorId("follower");
1337
1338             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1339                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1340             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1341             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1342
1343             MockRaftActorContext leaderActorContext =
1344                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1345
1346             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1347             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1348             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1349
1350             leaderActorContext.setConfigParams(configParams);
1351
1352             leaderActorContext.setReplicatedLog(
1353                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1354
1355             Map<String, String> peerAddresses = new HashMap<>();
1356             peerAddresses.put(follower1ActorId,
1357                     follower1Actor.path().toString());
1358             peerAddresses.put(follower2ActorId,
1359                     follower2Actor.path().toString());
1360
1361             leaderActorContext.setPeerAddresses(peerAddresses);
1362             leaderActorContext.getTermInformation().update(1, leaderActorId);
1363
1364             RaftActorBehavior leader = createBehavior(leaderActorContext);
1365
1366             leaderActor.underlyingActor().setBehavior(leader);
1367
1368             for(int i=1;i<6;i++) {
1369                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1370                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1371                 assertTrue(newBehavior == leader);
1372                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1373             }
1374
1375             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1376             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1377
1378             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1379                     heartbeats.size() > 1);
1380
1381             // Check if follower-2 got AppendEntries during this time and was not starved
1382             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1383
1384             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1385                     appendEntries.size() > 1);
1386
1387         }};
1388     }
1389
1390     @Override
1391     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1392             ActorRef actorRef, RaftRPC rpc) throws Exception {
1393         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1394         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1395     }
1396
1397     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1398
1399         private final long electionTimeOutIntervalMillis;
1400         private final int snapshotChunkSize;
1401
1402         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1403             super();
1404             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1405             this.snapshotChunkSize = snapshotChunkSize;
1406         }
1407
1408         @Override
1409         public FiniteDuration getElectionTimeOutInterval() {
1410             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1411         }
1412
1413         @Override
1414         public int getSnapshotChunkSize() {
1415             return snapshotChunkSize;
1416         }
1417     }
1418 }