Bug 3570: Use SnapShot lastAppliedIndex for install snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.Snapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
44 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
45 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
46 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
47 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 import scala.concurrent.duration.FiniteDuration;
49
50 public class LeaderTest extends AbstractLeaderTest {
51
52     static final String FOLLOWER_ID = "follower";
53     public static final String LEADER_ID = "leader";
54
55     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
56             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
57
58     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
59             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
60
61     private Leader leader;
62     private final short payloadVersion = 5;
63
64     @Override
65     @After
66     public void tearDown() throws Exception {
67         if(leader != null) {
68             leader.close();
69         }
70
71         super.tearDown();
72     }
73
74     @Test
75     public void testHandleMessageForUnknownMessage() throws Exception {
76         logStart("testHandleMessageForUnknownMessage");
77
78         leader = new Leader(createActorContext());
79
80         // handle message should return the Leader state when it receives an
81         // unknown message
82         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
83         Assert.assertTrue(behavior instanceof Leader);
84     }
85
86     @Test
87     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
88         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
89
90         MockRaftActorContext actorContext = createActorContextWithFollower();
91         short payloadVersion = (short)5;
92         actorContext.setPayloadVersion(payloadVersion);
93
94         long term = 1;
95         actorContext.getTermInformation().update(term, "");
96
97         leader = new Leader(actorContext);
98
99         // Leader should send an immediate heartbeat with no entries as follower is inactive.
100         long lastIndex = actorContext.getReplicatedLog().lastIndex();
101         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
102         assertEquals("getTerm", term, appendEntries.getTerm());
103         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
104         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
105         assertEquals("Entries size", 0, appendEntries.getEntries().size());
106         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
107
108         // The follower would normally reply - simulate that explicitly here.
109         leader.handleMessage(followerActor, new AppendEntriesReply(
110                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
111         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
112
113         followerActor.underlyingActor().clear();
114
115         // Sleep for the heartbeat interval so AppendEntries is sent.
116         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
117                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
118
119         leader.handleMessage(leaderActor, new SendHeartBeat());
120
121         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
122         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 1, appendEntries.getEntries().size());
125         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
126         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
127         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
128     }
129
130
131     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
132         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
133         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
134                 1, index, payload);
135         actorContext.getReplicatedLog().append(newEntry);
136         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
137     }
138
139     @Test
140     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
141         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
142
143         MockRaftActorContext actorContext = createActorContextWithFollower();
144
145         long term = 1;
146         actorContext.getTermInformation().update(term, "");
147
148         leader = new Leader(actorContext);
149
150         // Leader will send an immediate heartbeat - ignore it.
151         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
152
153         // The follower would normally reply - simulate that explicitly here.
154         long lastIndex = actorContext.getReplicatedLog().lastIndex();
155         leader.handleMessage(followerActor, new AppendEntriesReply(
156                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
157         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
158
159         followerActor.underlyingActor().clear();
160
161         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
162
163         // State should not change
164         assertTrue(raftBehavior instanceof Leader);
165
166         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
167         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
168         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
169         assertEquals("Entries size", 1, appendEntries.getEntries().size());
170         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
171         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
172         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
173     }
174
175     @Test
176     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
177         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
178
179         MockRaftActorContext actorContext = createActorContextWithFollower();
180         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
181             @Override
182             public FiniteDuration getHeartBeatInterval() {
183                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
184             }
185         });
186
187         long term = 1;
188         actorContext.getTermInformation().update(term, "");
189
190         leader = new Leader(actorContext);
191
192         // Leader will send an immediate heartbeat - ignore it.
193         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
194
195         // The follower would normally reply - simulate that explicitly here.
196         long lastIndex = actorContext.getReplicatedLog().lastIndex();
197         leader.handleMessage(followerActor, new AppendEntriesReply(
198                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
199         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
200
201         followerActor.underlyingActor().clear();
202
203         for(int i=0;i<5;i++) {
204             sendReplicate(actorContext, lastIndex+i+1);
205         }
206
207         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
208         // We expect only 1 message to be sent because of two reasons,
209         // - an append entries reply was not received
210         // - the heartbeat interval has not expired
211         // In this scenario if multiple messages are sent they would likely be duplicates
212         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
213     }
214
215     @Test
216     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
217         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
218
219         MockRaftActorContext actorContext = createActorContextWithFollower();
220         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
221             @Override
222             public FiniteDuration getHeartBeatInterval() {
223                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
224             }
225         });
226
227         long term = 1;
228         actorContext.getTermInformation().update(term, "");
229
230         leader = new Leader(actorContext);
231
232         // Leader will send an immediate heartbeat - ignore it.
233         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234
235         // The follower would normally reply - simulate that explicitly here.
236         long lastIndex = actorContext.getReplicatedLog().lastIndex();
237         leader.handleMessage(followerActor, new AppendEntriesReply(
238                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
239         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
240
241         followerActor.underlyingActor().clear();
242
243         for(int i=0;i<3;i++) {
244             sendReplicate(actorContext, lastIndex+i+1);
245             leader.handleMessage(followerActor, new AppendEntriesReply(
246                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
247
248         }
249
250         for(int i=3;i<5;i++) {
251             sendReplicate(actorContext, lastIndex + i + 1);
252         }
253
254         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
255         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
256         // get sent to the follower - but not the 5th
257         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
258
259         for(int i=0;i<4;i++) {
260             long expected = allMessages.get(i).getEntries().get(0).getIndex();
261             assertEquals(expected, i+2);
262         }
263     }
264
265     @Test
266     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
267         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
268
269         MockRaftActorContext actorContext = createActorContextWithFollower();
270         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
271             @Override
272             public FiniteDuration getHeartBeatInterval() {
273                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
274             }
275         });
276
277         long term = 1;
278         actorContext.getTermInformation().update(term, "");
279
280         leader = new Leader(actorContext);
281
282         // Leader will send an immediate heartbeat - ignore it.
283         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284
285         // The follower would normally reply - simulate that explicitly here.
286         long lastIndex = actorContext.getReplicatedLog().lastIndex();
287         leader.handleMessage(followerActor, new AppendEntriesReply(
288                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
289         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290
291         followerActor.underlyingActor().clear();
292
293         sendReplicate(actorContext, lastIndex+1);
294
295         // Wait slightly longer than heartbeat duration
296         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
297
298         leader.handleMessage(leaderActor, new SendHeartBeat());
299
300         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
301         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
302
303         assertEquals(1, allMessages.get(0).getEntries().size());
304         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
305         assertEquals(1, allMessages.get(1).getEntries().size());
306         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
307
308     }
309
310     @Test
311     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
312         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
313
314         MockRaftActorContext actorContext = createActorContextWithFollower();
315         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
316             @Override
317             public FiniteDuration getHeartBeatInterval() {
318                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
319             }
320         });
321
322         long term = 1;
323         actorContext.getTermInformation().update(term, "");
324
325         leader = new Leader(actorContext);
326
327         // Leader will send an immediate heartbeat - ignore it.
328         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
329
330         // The follower would normally reply - simulate that explicitly here.
331         long lastIndex = actorContext.getReplicatedLog().lastIndex();
332         leader.handleMessage(followerActor, new AppendEntriesReply(
333                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
334         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
335
336         followerActor.underlyingActor().clear();
337
338         for(int i=0;i<3;i++) {
339             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
340             leader.handleMessage(leaderActor, new SendHeartBeat());
341         }
342
343         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
344         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
345     }
346
347     @Test
348     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
349         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
350
351         MockRaftActorContext actorContext = createActorContextWithFollower();
352         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
353             @Override
354             public FiniteDuration getHeartBeatInterval() {
355                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
356             }
357         });
358
359         long term = 1;
360         actorContext.getTermInformation().update(term, "");
361
362         leader = new Leader(actorContext);
363
364         // Leader will send an immediate heartbeat - ignore it.
365         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
366
367         // The follower would normally reply - simulate that explicitly here.
368         long lastIndex = actorContext.getReplicatedLog().lastIndex();
369         leader.handleMessage(followerActor, new AppendEntriesReply(
370                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
371         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
372
373         followerActor.underlyingActor().clear();
374
375         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
376         leader.handleMessage(leaderActor, new SendHeartBeat());
377         sendReplicate(actorContext, lastIndex+1);
378
379         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
380         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
381
382         assertEquals(0, allMessages.get(0).getEntries().size());
383         assertEquals(1, allMessages.get(1).getEntries().size());
384     }
385
386
387     @Test
388     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
389         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
390
391         MockRaftActorContext actorContext = createActorContext();
392
393         leader = new Leader(actorContext);
394
395         actorContext.setLastApplied(0);
396
397         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
398         long term = actorContext.getTermInformation().getCurrentTerm();
399         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
400                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
401
402         actorContext.getReplicatedLog().append(newEntry);
403
404         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
405                 new Replicate(leaderActor, "state-id", newEntry));
406
407         // State should not change
408         assertTrue(raftBehavior instanceof Leader);
409
410         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
411
412         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
413         // one since lastApplied state is 0.
414         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
415                 leaderActor, ApplyState.class);
416         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
417
418         for(int i = 0; i <= newLogIndex - 1; i++ ) {
419             ApplyState applyState = applyStateList.get(i);
420             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
421             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
422         }
423
424         ApplyState last = applyStateList.get((int) newLogIndex - 1);
425         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
426         assertEquals("getIdentifier", "state-id", last.getIdentifier());
427     }
428
429     @Test
430     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
431         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
432
433         MockRaftActorContext actorContext = createActorContextWithFollower();
434
435         Map<String, String> leadersSnapshot = new HashMap<>();
436         leadersSnapshot.put("1", "A");
437         leadersSnapshot.put("2", "B");
438         leadersSnapshot.put("3", "C");
439
440         //clears leaders log
441         actorContext.getReplicatedLog().removeFrom(0);
442
443         final int commitIndex = 3;
444         final int snapshotIndex = 2;
445         final int newEntryIndex = 4;
446         final int snapshotTerm = 1;
447         final int currentTerm = 2;
448
449         // set the snapshot variables in replicatedlog
450         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
451         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
452         actorContext.setCommitIndex(commitIndex);
453         //set follower timeout to 2 mins, helps during debugging
454         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
455
456         leader = new Leader(actorContext);
457
458         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
459         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
460
461         // new entry
462         ReplicatedLogImplEntry entry =
463                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
464                         new MockRaftActorContext.MockPayload("D"));
465
466         //update follower timestamp
467         leader.markFollowerActive(FOLLOWER_ID);
468
469         ByteString bs = toByteString(leadersSnapshot);
470         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
471                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
472         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
473         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
474
475         //send first chunk and no InstallSnapshotReply received yet
476         fts.getNextChunk();
477         fts.incrementChunkIndex();
478
479         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
480                 TimeUnit.MILLISECONDS);
481
482         leader.handleMessage(leaderActor, new SendHeartBeat());
483
484         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
485
486         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
487
488         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
489
490         //InstallSnapshotReply received
491         fts.markSendStatus(true);
492
493         leader.handleMessage(leaderActor, new SendHeartBeat());
494
495         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
496
497         assertEquals(commitIndex, is.getLastIncludedIndex());
498     }
499
500     @Test
501     public void testSendAppendEntriesSnapshotScenario() throws Exception {
502         logStart("testSendAppendEntriesSnapshotScenario");
503
504         MockRaftActorContext actorContext = createActorContextWithFollower();
505
506         Map<String, String> leadersSnapshot = new HashMap<>();
507         leadersSnapshot.put("1", "A");
508         leadersSnapshot.put("2", "B");
509         leadersSnapshot.put("3", "C");
510
511         //clears leaders log
512         actorContext.getReplicatedLog().removeFrom(0);
513
514         final int followersLastIndex = 2;
515         final int snapshotIndex = 3;
516         final int newEntryIndex = 4;
517         final int snapshotTerm = 1;
518         final int currentTerm = 2;
519
520         // set the snapshot variables in replicatedlog
521         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
522         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
523         actorContext.setCommitIndex(followersLastIndex);
524
525         leader = new Leader(actorContext);
526
527         // Leader will send an immediate heartbeat - ignore it.
528         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
529
530         // new entry
531         ReplicatedLogImplEntry entry =
532                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
533                         new MockRaftActorContext.MockPayload("D"));
534
535         actorContext.getReplicatedLog().append(entry);
536
537         //update follower timestamp
538         leader.markFollowerActive(FOLLOWER_ID);
539
540         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
541         RaftActorBehavior raftBehavior = leader.handleMessage(
542                 leaderActor, new Replicate(null, "state-id", entry));
543
544         assertTrue(raftBehavior instanceof Leader);
545
546         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
547     }
548
549     @Test
550     public void testInitiateInstallSnapshot() throws Exception {
551         logStart("testInitiateInstallSnapshot");
552
553         MockRaftActorContext actorContext = createActorContextWithFollower();
554
555         Map<String, String> leadersSnapshot = new HashMap<>();
556         leadersSnapshot.put("1", "A");
557         leadersSnapshot.put("2", "B");
558         leadersSnapshot.put("3", "C");
559
560         //clears leaders log
561         actorContext.getReplicatedLog().removeFrom(0);
562
563         final int followersLastIndex = 2;
564         final int snapshotIndex = 3;
565         final int newEntryIndex = 4;
566         final int snapshotTerm = 1;
567         final int currentTerm = 2;
568
569         // set the snapshot variables in replicatedlog
570         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572         actorContext.setLastApplied(3);
573         actorContext.setCommitIndex(followersLastIndex);
574
575         leader = new Leader(actorContext);
576
577         // Leader will send an immediate heartbeat - ignore it.
578         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
579
580         // set the snapshot as absent and check if capture-snapshot is invoked.
581         leader.setSnapshot(null);
582
583         // new entry
584         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
585                 new MockRaftActorContext.MockPayload("D"));
586
587         actorContext.getReplicatedLog().append(entry);
588
589         //update follower timestamp
590         leader.markFollowerActive(FOLLOWER_ID);
591
592         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
593
594         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
595
596         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
597
598         assertTrue(cs.isInstallSnapshotInitiated());
599         assertEquals(3, cs.getLastAppliedIndex());
600         assertEquals(1, cs.getLastAppliedTerm());
601         assertEquals(4, cs.getLastIndex());
602         assertEquals(2, cs.getLastTerm());
603
604         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
605         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
606
607         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
608     }
609
610     @Test
611     public void testInstallSnapshot() throws Exception {
612         logStart("testInstallSnapshot");
613
614         MockRaftActorContext actorContext = createActorContextWithFollower();
615
616         Map<String, String> leadersSnapshot = new HashMap<>();
617         leadersSnapshot.put("1", "A");
618         leadersSnapshot.put("2", "B");
619         leadersSnapshot.put("3", "C");
620
621         //clears leaders log
622         actorContext.getReplicatedLog().removeFrom(0);
623
624         final int lastAppliedIndex = 3;
625         final int snapshotIndex = 2;
626         final int snapshotTerm = 1;
627         final int currentTerm = 2;
628
629         // set the snapshot variables in replicatedlog
630         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
631         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
632         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
633         actorContext.setCommitIndex(lastAppliedIndex);
634         actorContext.setLastApplied(lastAppliedIndex);
635
636         leader = new Leader(actorContext);
637
638         // Initial heartbeat.
639         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
640
641         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
642         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
643
644         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
645                 Collections.<ReplicatedLogEntry>emptyList(),
646                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
647
648         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
649
650         assertTrue(raftBehavior instanceof Leader);
651
652         // check if installsnapshot gets called with the correct values.
653
654         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
655
656         assertNotNull(installSnapshot.getData());
657         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
658         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
659
660         assertEquals(currentTerm, installSnapshot.getTerm());
661     }
662
663     @Test
664     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
665         logStart("testHandleInstallSnapshotReplyLastChunk");
666
667         MockRaftActorContext actorContext = createActorContextWithFollower();
668
669         final int commitIndex = 3;
670         final int snapshotIndex = 2;
671         final int snapshotTerm = 1;
672         final int currentTerm = 2;
673
674         actorContext.setCommitIndex(commitIndex);
675
676         leader = new Leader(actorContext);
677
678         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
679         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
680
681         // Ignore initial heartbeat.
682         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
683
684         Map<String, String> leadersSnapshot = new HashMap<>();
685         leadersSnapshot.put("1", "A");
686         leadersSnapshot.put("2", "B");
687         leadersSnapshot.put("3", "C");
688
689         // set the snapshot variables in replicatedlog
690
691         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
692         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
693         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
694
695         ByteString bs = toByteString(leadersSnapshot);
696         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
697                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
698         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
699         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
700         while(!fts.isLastChunk(fts.getChunkIndex())) {
701             fts.getNextChunk();
702             fts.incrementChunkIndex();
703         }
704
705         //clears leaders log
706         actorContext.getReplicatedLog().removeFrom(0);
707
708         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
709                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
710
711         assertTrue(raftBehavior instanceof Leader);
712
713         assertEquals(0, leader.followerSnapshotSize());
714         assertEquals(1, leader.followerLogSize());
715         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
716         assertNotNull(fli);
717         assertEquals(commitIndex, fli.getMatchIndex());
718         assertEquals(commitIndex + 1, fli.getNextIndex());
719     }
720
721     @Test
722     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
723         logStart("testSendSnapshotfromInstallSnapshotReply");
724
725         MockRaftActorContext actorContext = createActorContextWithFollower();
726
727         final int commitIndex = 3;
728         final int snapshotIndex = 2;
729         final int snapshotTerm = 1;
730         final int currentTerm = 2;
731
732         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
733             @Override
734             public int getSnapshotChunkSize() {
735                 return 50;
736             }
737         };
738         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
739         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
740
741         actorContext.setConfigParams(configParams);
742         actorContext.setCommitIndex(commitIndex);
743
744         leader = new Leader(actorContext);
745
746         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
747         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
748
749         Map<String, String> leadersSnapshot = new HashMap<>();
750         leadersSnapshot.put("1", "A");
751         leadersSnapshot.put("2", "B");
752         leadersSnapshot.put("3", "C");
753
754         // set the snapshot variables in replicatedlog
755         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
756         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
757         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
758
759         ByteString bs = toByteString(leadersSnapshot);
760         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
761                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
762         leader.setSnapshot(snapshot);
763
764         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
765
766         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
767
768         assertEquals(1, installSnapshot.getChunkIndex());
769         assertEquals(3, installSnapshot.getTotalChunks());
770
771         followerActor.underlyingActor().clear();
772         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
773                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
774
775         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
776
777         assertEquals(2, installSnapshot.getChunkIndex());
778         assertEquals(3, installSnapshot.getTotalChunks());
779
780         followerActor.underlyingActor().clear();
781         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
782                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
783
784         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
785
786         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
787         followerActor.underlyingActor().clear();
788         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
789                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
790
791         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
792
793         Assert.assertNull(installSnapshot);
794     }
795
796
797     @Test
798     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
799         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
800
801         MockRaftActorContext actorContext = createActorContextWithFollower();
802
803         final int commitIndex = 3;
804         final int snapshotIndex = 2;
805         final int snapshotTerm = 1;
806         final int currentTerm = 2;
807
808         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
809             @Override
810             public int getSnapshotChunkSize() {
811                 return 50;
812             }
813         });
814
815         actorContext.setCommitIndex(commitIndex);
816
817         leader = new Leader(actorContext);
818
819         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
820         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
821
822         Map<String, String> leadersSnapshot = new HashMap<>();
823         leadersSnapshot.put("1", "A");
824         leadersSnapshot.put("2", "B");
825         leadersSnapshot.put("3", "C");
826
827         // set the snapshot variables in replicatedlog
828         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
829         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
830         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
831
832         ByteString bs = toByteString(leadersSnapshot);
833         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
834                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
835         leader.setSnapshot(snapshot);
836
837         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
838         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
839
840         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
841
842         assertEquals(1, installSnapshot.getChunkIndex());
843         assertEquals(3, installSnapshot.getTotalChunks());
844
845         followerActor.underlyingActor().clear();
846
847         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
848                 FOLLOWER_ID, -1, false));
849
850         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
851                 TimeUnit.MILLISECONDS);
852
853         leader.handleMessage(leaderActor, new SendHeartBeat());
854
855         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
856
857         assertEquals(1, installSnapshot.getChunkIndex());
858         assertEquals(3, installSnapshot.getTotalChunks());
859     }
860
861     @Test
862     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
863         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
864
865         MockRaftActorContext actorContext = createActorContextWithFollower();
866
867         final int commitIndex = 3;
868         final int snapshotIndex = 2;
869         final int snapshotTerm = 1;
870         final int currentTerm = 2;
871
872         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
873             @Override
874             public int getSnapshotChunkSize() {
875                 return 50;
876             }
877         });
878
879         actorContext.setCommitIndex(commitIndex);
880
881         leader = new Leader(actorContext);
882
883         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
884         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
885
886         Map<String, String> leadersSnapshot = new HashMap<>();
887         leadersSnapshot.put("1", "A");
888         leadersSnapshot.put("2", "B");
889         leadersSnapshot.put("3", "C");
890
891         // set the snapshot variables in replicatedlog
892         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
893         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
894         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
895
896         ByteString bs = toByteString(leadersSnapshot);
897         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
898                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
899         leader.setSnapshot(snapshot);
900
901         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
902
903         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
904
905         assertEquals(1, installSnapshot.getChunkIndex());
906         assertEquals(3, installSnapshot.getTotalChunks());
907         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
908
909         int hashCode = installSnapshot.getData().hashCode();
910
911         followerActor.underlyingActor().clear();
912
913         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
914                 FOLLOWER_ID, 1, true));
915
916         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
917
918         assertEquals(2, installSnapshot.getChunkIndex());
919         assertEquals(3, installSnapshot.getTotalChunks());
920         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
921     }
922
923     @Test
924     public void testFollowerToSnapshotLogic() {
925         logStart("testFollowerToSnapshotLogic");
926
927         MockRaftActorContext actorContext = createActorContext();
928
929         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
930             @Override
931             public int getSnapshotChunkSize() {
932                 return 50;
933             }
934         });
935
936         leader = new Leader(actorContext);
937
938         Map<String, String> leadersSnapshot = new HashMap<>();
939         leadersSnapshot.put("1", "A");
940         leadersSnapshot.put("2", "B");
941         leadersSnapshot.put("3", "C");
942
943         ByteString bs = toByteString(leadersSnapshot);
944         byte[] barray = bs.toByteArray();
945
946         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
947         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
948
949         assertEquals(bs.size(), barray.length);
950
951         int chunkIndex=0;
952         for (int i=0; i < barray.length; i = i + 50) {
953             int j = i + 50;
954             chunkIndex++;
955
956             if (i + 50 > barray.length) {
957                 j = barray.length;
958             }
959
960             ByteString chunk = fts.getNextChunk();
961             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
962             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
963
964             fts.markSendStatus(true);
965             if (!fts.isLastChunk(chunkIndex)) {
966                 fts.incrementChunkIndex();
967             }
968         }
969
970         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
971     }
972
973     @Override protected RaftActorBehavior createBehavior(
974         RaftActorContext actorContext) {
975         return new Leader(actorContext);
976     }
977
978     @Override
979     protected MockRaftActorContext createActorContext() {
980         return createActorContext(leaderActor);
981     }
982
983     @Override
984     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
985         return createActorContext(LEADER_ID, actorRef);
986     }
987
988     private MockRaftActorContext createActorContextWithFollower() {
989         MockRaftActorContext actorContext = createActorContext();
990         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
991                 followerActor.path().toString()).build());
992         return actorContext;
993     }
994
995     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
996         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
997         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
998         configParams.setElectionTimeoutFactor(100000);
999         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1000         context.setConfigParams(configParams);
1001         context.setPayloadVersion(payloadVersion);
1002         return context;
1003     }
1004
1005     private MockRaftActorContext createFollowerActorContextWithLeader() {
1006         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1007         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1008         followerConfig.setElectionTimeoutFactor(10000);
1009         followerActorContext.setConfigParams(followerConfig);
1010         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1011         return followerActorContext;
1012     }
1013
1014     @Test
1015     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1016         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1017
1018         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1019
1020         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1021
1022         Follower follower = new Follower(followerActorContext);
1023         followerActor.underlyingActor().setBehavior(follower);
1024
1025         Map<String, String> peerAddresses = new HashMap<>();
1026         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1027
1028         leaderActorContext.setPeerAddresses(peerAddresses);
1029
1030         leaderActorContext.getReplicatedLog().removeFrom(0);
1031
1032         //create 3 entries
1033         leaderActorContext.setReplicatedLog(
1034                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1035
1036         leaderActorContext.setCommitIndex(1);
1037
1038         followerActorContext.getReplicatedLog().removeFrom(0);
1039
1040         // follower too has the exact same log entries and has the same commit index
1041         followerActorContext.setReplicatedLog(
1042                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1043
1044         followerActorContext.setCommitIndex(1);
1045
1046         leader = new Leader(leaderActorContext);
1047
1048         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1049
1050         assertEquals(1, appendEntries.getLeaderCommit());
1051         assertEquals(0, appendEntries.getEntries().size());
1052         assertEquals(0, appendEntries.getPrevLogIndex());
1053
1054         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1055                 leaderActor, AppendEntriesReply.class);
1056
1057         assertEquals(2, appendEntriesReply.getLogLastIndex());
1058         assertEquals(1, appendEntriesReply.getLogLastTerm());
1059
1060         // follower returns its next index
1061         assertEquals(2, appendEntriesReply.getLogLastIndex());
1062         assertEquals(1, appendEntriesReply.getLogLastTerm());
1063
1064         follower.close();
1065     }
1066
1067     @Test
1068     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1069         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1070
1071         MockRaftActorContext leaderActorContext = createActorContext();
1072
1073         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1074         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1075
1076         Follower follower = new Follower(followerActorContext);
1077         followerActor.underlyingActor().setBehavior(follower);
1078
1079         Map<String, String> leaderPeerAddresses = new HashMap<>();
1080         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1081
1082         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1083
1084         leaderActorContext.getReplicatedLog().removeFrom(0);
1085
1086         leaderActorContext.setReplicatedLog(
1087                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1088
1089         leaderActorContext.setCommitIndex(1);
1090
1091         followerActorContext.getReplicatedLog().removeFrom(0);
1092
1093         followerActorContext.setReplicatedLog(
1094                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1095
1096         // follower has the same log entries but its commit index > leaders commit index
1097         followerActorContext.setCommitIndex(2);
1098
1099         leader = new Leader(leaderActorContext);
1100
1101         // Initial heartbeat
1102         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1103
1104         assertEquals(1, appendEntries.getLeaderCommit());
1105         assertEquals(0, appendEntries.getEntries().size());
1106         assertEquals(0, appendEntries.getPrevLogIndex());
1107
1108         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1109                 leaderActor, AppendEntriesReply.class);
1110
1111         assertEquals(2, appendEntriesReply.getLogLastIndex());
1112         assertEquals(1, appendEntriesReply.getLogLastTerm());
1113
1114         leaderActor.underlyingActor().setBehavior(follower);
1115         leader.handleMessage(followerActor, appendEntriesReply);
1116
1117         leaderActor.underlyingActor().clear();
1118         followerActor.underlyingActor().clear();
1119
1120         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1121                 TimeUnit.MILLISECONDS);
1122
1123         leader.handleMessage(leaderActor, new SendHeartBeat());
1124
1125         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1126
1127         assertEquals(2, appendEntries.getLeaderCommit());
1128         assertEquals(0, appendEntries.getEntries().size());
1129         assertEquals(2, appendEntries.getPrevLogIndex());
1130
1131         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1132
1133         assertEquals(2, appendEntriesReply.getLogLastIndex());
1134         assertEquals(1, appendEntriesReply.getLogLastTerm());
1135
1136         assertEquals(2, followerActorContext.getCommitIndex());
1137
1138         follower.close();
1139     }
1140
1141     @Test
1142     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1143         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1144
1145         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1146         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1147                 new FiniteDuration(1000, TimeUnit.SECONDS));
1148
1149         leaderActorContext.setReplicatedLog(
1150                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1151         long leaderCommitIndex = 2;
1152         leaderActorContext.setCommitIndex(leaderCommitIndex);
1153         leaderActorContext.setLastApplied(leaderCommitIndex);
1154
1155         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1156         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1157
1158         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1159
1160         followerActorContext.setReplicatedLog(
1161                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1162         followerActorContext.setCommitIndex(0);
1163         followerActorContext.setLastApplied(0);
1164
1165         Follower follower = new Follower(followerActorContext);
1166         followerActor.underlyingActor().setBehavior(follower);
1167
1168         leader = new Leader(leaderActorContext);
1169
1170         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1171         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1172
1173         MessageCollectorActor.clearMessages(followerActor);
1174         MessageCollectorActor.clearMessages(leaderActor);
1175
1176         // Verify initial AppendEntries sent with the leader's current commit index.
1177         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1178         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1179         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1180
1181         leaderActor.underlyingActor().setBehavior(leader);
1182
1183         leader.handleMessage(followerActor, appendEntriesReply);
1184
1185         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1186         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1187
1188         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1189         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1190         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1191
1192         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1193         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1194                 appendEntries.getEntries().get(0).getData());
1195         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1196         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1197                 appendEntries.getEntries().get(1).getData());
1198
1199         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1200         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1201
1202         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1203
1204         ApplyState applyState = applyStateList.get(0);
1205         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1206         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1207         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1208                 applyState.getReplicatedLogEntry().getData());
1209
1210         applyState = applyStateList.get(1);
1211         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1212         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1213         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1214                 applyState.getReplicatedLogEntry().getData());
1215
1216         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1217         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1218     }
1219
1220     @Test
1221     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1222         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1223
1224         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1225         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1226                 new FiniteDuration(1000, TimeUnit.SECONDS));
1227
1228         leaderActorContext.setReplicatedLog(
1229                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1230         long leaderCommitIndex = 1;
1231         leaderActorContext.setCommitIndex(leaderCommitIndex);
1232         leaderActorContext.setLastApplied(leaderCommitIndex);
1233
1234         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1235         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1236
1237         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1238
1239         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1240         followerActorContext.setCommitIndex(-1);
1241         followerActorContext.setLastApplied(-1);
1242
1243         Follower follower = new Follower(followerActorContext);
1244         followerActor.underlyingActor().setBehavior(follower);
1245
1246         leader = new Leader(leaderActorContext);
1247
1248         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1249         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1250
1251         MessageCollectorActor.clearMessages(followerActor);
1252         MessageCollectorActor.clearMessages(leaderActor);
1253
1254         // Verify initial AppendEntries sent with the leader's current commit index.
1255         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1256         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1257         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1258
1259         leaderActor.underlyingActor().setBehavior(leader);
1260
1261         leader.handleMessage(followerActor, appendEntriesReply);
1262
1263         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1264         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1265
1266         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1267         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1268         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1269
1270         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1271         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1272                 appendEntries.getEntries().get(0).getData());
1273         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1274         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1275                 appendEntries.getEntries().get(1).getData());
1276
1277         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1278         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1279
1280         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1281
1282         ApplyState applyState = applyStateList.get(0);
1283         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1284         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1285         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1286                 applyState.getReplicatedLogEntry().getData());
1287
1288         applyState = applyStateList.get(1);
1289         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1290         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1291         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1292                 applyState.getReplicatedLogEntry().getData());
1293
1294         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1295         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1296     }
1297
1298     @Test
1299     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1300         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1301
1302         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1303         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1304                 new FiniteDuration(1000, TimeUnit.SECONDS));
1305
1306         leaderActorContext.setReplicatedLog(
1307                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1308         long leaderCommitIndex = 1;
1309         leaderActorContext.setCommitIndex(leaderCommitIndex);
1310         leaderActorContext.setLastApplied(leaderCommitIndex);
1311
1312         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1313         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1314
1315         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1316
1317         followerActorContext.setReplicatedLog(
1318                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1319         followerActorContext.setCommitIndex(-1);
1320         followerActorContext.setLastApplied(-1);
1321
1322         Follower follower = new Follower(followerActorContext);
1323         followerActor.underlyingActor().setBehavior(follower);
1324
1325         leader = new Leader(leaderActorContext);
1326
1327         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1328         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1329
1330         MessageCollectorActor.clearMessages(followerActor);
1331         MessageCollectorActor.clearMessages(leaderActor);
1332
1333         // Verify initial AppendEntries sent with the leader's current commit index.
1334         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1335         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1336         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1337
1338         leaderActor.underlyingActor().setBehavior(leader);
1339
1340         leader.handleMessage(followerActor, appendEntriesReply);
1341
1342         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1343         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1344
1345         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1346         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1347         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1348
1349         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1350         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1351         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1352                 appendEntries.getEntries().get(0).getData());
1353         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1354         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1355         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1356                 appendEntries.getEntries().get(1).getData());
1357
1358         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1359         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1360
1361         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1362
1363         ApplyState applyState = applyStateList.get(0);
1364         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1365         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1366         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1367                 applyState.getReplicatedLogEntry().getData());
1368
1369         applyState = applyStateList.get(1);
1370         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1371         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1372         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1373                 applyState.getReplicatedLogEntry().getData());
1374
1375         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1376         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1377         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1378     }
1379
1380     @Test
1381     public void testHandleAppendEntriesReplySuccess() throws Exception {
1382         logStart("testHandleAppendEntriesReplySuccess");
1383
1384         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1385
1386         leaderActorContext.setReplicatedLog(
1387                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1388
1389         leaderActorContext.setCommitIndex(1);
1390         leaderActorContext.setLastApplied(1);
1391         leaderActorContext.getTermInformation().update(1, "leader");
1392
1393         leader = new Leader(leaderActorContext);
1394
1395         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1396
1397         short payloadVersion = 5;
1398         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1399
1400         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1401
1402         assertEquals(RaftState.Leader, raftActorBehavior.state());
1403
1404         assertEquals(2, leaderActorContext.getCommitIndex());
1405
1406         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1407                 leaderActor, ApplyJournalEntries.class);
1408
1409         assertEquals(2, leaderActorContext.getLastApplied());
1410
1411         assertEquals(2, applyJournalEntries.getToIndex());
1412
1413         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1414                 ApplyState.class);
1415
1416         assertEquals(1,applyStateList.size());
1417
1418         ApplyState applyState = applyStateList.get(0);
1419
1420         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1421
1422         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1423         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1424     }
1425
1426     @Test
1427     public void testHandleAppendEntriesReplyUnknownFollower(){
1428         logStart("testHandleAppendEntriesReplyUnknownFollower");
1429
1430         MockRaftActorContext leaderActorContext = createActorContext();
1431
1432         leader = new Leader(leaderActorContext);
1433
1434         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1435
1436         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1437
1438         assertEquals(RaftState.Leader, raftActorBehavior.state());
1439     }
1440
1441     @Test
1442     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1443         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1444
1445         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1446         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1447                 new FiniteDuration(1000, TimeUnit.SECONDS));
1448         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1449
1450         leaderActorContext.setReplicatedLog(
1451                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1452         long leaderCommitIndex = 3;
1453         leaderActorContext.setCommitIndex(leaderCommitIndex);
1454         leaderActorContext.setLastApplied(leaderCommitIndex);
1455
1456         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1457         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1458         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1459         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1460
1461         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1462
1463         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1464         followerActorContext.setCommitIndex(-1);
1465         followerActorContext.setLastApplied(-1);
1466
1467         Follower follower = new Follower(followerActorContext);
1468         followerActor.underlyingActor().setBehavior(follower);
1469
1470         leader = new Leader(leaderActorContext);
1471
1472         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1473         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1474
1475         MessageCollectorActor.clearMessages(followerActor);
1476         MessageCollectorActor.clearMessages(leaderActor);
1477
1478         // Verify initial AppendEntries sent with the leader's current commit index.
1479         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1480         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1481         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1482
1483         leaderActor.underlyingActor().setBehavior(leader);
1484
1485         leader.handleMessage(followerActor, appendEntriesReply);
1486
1487         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1488         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1489
1490         appendEntries = appendEntriesList.get(0);
1491         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1492         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1493         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1494
1495         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1496         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1497                 appendEntries.getEntries().get(0).getData());
1498         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1499         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1500                 appendEntries.getEntries().get(1).getData());
1501
1502         appendEntries = appendEntriesList.get(1);
1503         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1504         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1505         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1506
1507         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1508         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1509                 appendEntries.getEntries().get(0).getData());
1510         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1511         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1512                 appendEntries.getEntries().get(1).getData());
1513
1514         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1515         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1516
1517         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1518
1519         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1520         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1521     }
1522
1523     @Test
1524     public void testHandleRequestVoteReply(){
1525         logStart("testHandleRequestVoteReply");
1526
1527         MockRaftActorContext leaderActorContext = createActorContext();
1528
1529         leader = new Leader(leaderActorContext);
1530
1531         // Should be a no-op.
1532         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1533                 new RequestVoteReply(1, true));
1534
1535         assertEquals(RaftState.Leader, raftActorBehavior.state());
1536
1537         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1538
1539         assertEquals(RaftState.Leader, raftActorBehavior.state());
1540     }
1541
1542     @Test
1543     public void testIsolatedLeaderCheckNoFollowers() {
1544         logStart("testIsolatedLeaderCheckNoFollowers");
1545
1546         MockRaftActorContext leaderActorContext = createActorContext();
1547
1548         leader = new Leader(leaderActorContext);
1549         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1550         Assert.assertTrue(behavior instanceof Leader);
1551     }
1552
1553     @Test
1554     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1555         logStart("testIsolatedLeaderCheckTwoFollowers");
1556
1557         new JavaTestKit(getSystem()) {{
1558
1559             ActorRef followerActor1 = getTestActor();
1560             ActorRef followerActor2 = getTestActor();
1561
1562             MockRaftActorContext leaderActorContext = createActorContext();
1563
1564             Map<String, String> peerAddresses = new HashMap<>();
1565             peerAddresses.put("follower-1", followerActor1.path().toString());
1566             peerAddresses.put("follower-2", followerActor2.path().toString());
1567
1568             leaderActorContext.setPeerAddresses(peerAddresses);
1569
1570             leader = new Leader(leaderActorContext);
1571
1572             leader.markFollowerActive("follower-1");
1573             leader.markFollowerActive("follower-2");
1574             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1575             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1576                 behavior instanceof Leader);
1577
1578             // kill 1 follower and verify if that got killed
1579             final JavaTestKit probe = new JavaTestKit(getSystem());
1580             probe.watch(followerActor1);
1581             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1582             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1583             assertEquals(termMsg1.getActor(), followerActor1);
1584
1585             leader.markFollowerInActive("follower-1");
1586             leader.markFollowerActive("follower-2");
1587             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1588             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1589                 behavior instanceof Leader);
1590
1591             // kill 2nd follower and leader should change to Isolated leader
1592             followerActor2.tell(PoisonPill.getInstance(), null);
1593             probe.watch(followerActor2);
1594             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1595             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1596             assertEquals(termMsg2.getActor(), followerActor2);
1597
1598             leader.markFollowerInActive("follower-2");
1599             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1600             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1601                 behavior instanceof IsolatedLeader);
1602         }};
1603     }
1604
1605     @Test
1606     public void testLaggingFollowerStarvation() throws Exception {
1607         logStart("testLaggingFollowerStarvation");
1608         new JavaTestKit(getSystem()) {{
1609             String leaderActorId = actorFactory.generateActorId("leader");
1610             String follower1ActorId = actorFactory.generateActorId("follower");
1611             String follower2ActorId = actorFactory.generateActorId("follower");
1612
1613             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1614                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1615             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1616             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1617
1618             MockRaftActorContext leaderActorContext =
1619                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1620
1621             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1622             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1623             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1624
1625             leaderActorContext.setConfigParams(configParams);
1626
1627             leaderActorContext.setReplicatedLog(
1628                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1629
1630             Map<String, String> peerAddresses = new HashMap<>();
1631             peerAddresses.put(follower1ActorId,
1632                     follower1Actor.path().toString());
1633             peerAddresses.put(follower2ActorId,
1634                     follower2Actor.path().toString());
1635
1636             leaderActorContext.setPeerAddresses(peerAddresses);
1637             leaderActorContext.getTermInformation().update(1, leaderActorId);
1638
1639             RaftActorBehavior leader = createBehavior(leaderActorContext);
1640
1641             leaderActor.underlyingActor().setBehavior(leader);
1642
1643             for(int i=1;i<6;i++) {
1644                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1645                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1646                 assertTrue(newBehavior == leader);
1647                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1648             }
1649
1650             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1651             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1652
1653             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1654                     heartbeats.size() > 1);
1655
1656             // Check if follower-2 got AppendEntries during this time and was not starved
1657             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1658
1659             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1660                     appendEntries.size() > 1);
1661
1662         }};
1663     }
1664
1665     @Override
1666     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1667             ActorRef actorRef, RaftRPC rpc) throws Exception {
1668         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1669         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1670     }
1671
1672     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1673
1674         private final long electionTimeOutIntervalMillis;
1675         private final int snapshotChunkSize;
1676
1677         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1678             super();
1679             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1680             this.snapshotChunkSize = snapshotChunkSize;
1681         }
1682
1683         @Override
1684         public FiniteDuration getElectionTimeOutInterval() {
1685             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1686         }
1687
1688         @Override
1689         public int getSnapshotChunkSize() {
1690             return snapshotChunkSize;
1691         }
1692     }
1693 }