BUG 2185 : Introduce RaftPolicy & DefaultRaftPolicy
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.google.protobuf.ByteString;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.Snapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
35 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
44 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
45 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
46 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
47 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
48 import scala.concurrent.duration.FiniteDuration;
49
50 public class LeaderTest extends AbstractLeaderTest {
51
52     static final String FOLLOWER_ID = "follower";
53     public static final String LEADER_ID = "leader";
54
55     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
56             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
57
58     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
59             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
60
61     private Leader leader;
62     private final short payloadVersion = 5;
63
64     @Override
65     @After
66     public void tearDown() throws Exception {
67         if(leader != null) {
68             leader.close();
69         }
70
71         super.tearDown();
72     }
73
74     @Test
75     public void testHandleMessageForUnknownMessage() throws Exception {
76         logStart("testHandleMessageForUnknownMessage");
77
78         leader = new Leader(createActorContext());
79
80         // handle message should return the Leader state when it receives an
81         // unknown message
82         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
83         Assert.assertTrue(behavior instanceof Leader);
84     }
85
86     @Test
87     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
88         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
89
90         MockRaftActorContext actorContext = createActorContextWithFollower();
91         short payloadVersion = (short)5;
92         actorContext.setPayloadVersion(payloadVersion);
93
94         long term = 1;
95         actorContext.getTermInformation().update(term, "");
96
97         leader = new Leader(actorContext);
98
99         // Leader should send an immediate heartbeat with no entries as follower is inactive.
100         long lastIndex = actorContext.getReplicatedLog().lastIndex();
101         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
102         assertEquals("getTerm", term, appendEntries.getTerm());
103         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
104         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
105         assertEquals("Entries size", 0, appendEntries.getEntries().size());
106         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
107
108         // The follower would normally reply - simulate that explicitly here.
109         leader.handleMessage(followerActor, new AppendEntriesReply(
110                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
111         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
112
113         followerActor.underlyingActor().clear();
114
115         // Sleep for the heartbeat interval so AppendEntries is sent.
116         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
117                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
118
119         leader.handleMessage(leaderActor, new SendHeartBeat());
120
121         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
122         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 1, appendEntries.getEntries().size());
125         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
126         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
127         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
128     }
129
130
131     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
132         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
133         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
134                 1, index, payload);
135         actorContext.getReplicatedLog().append(newEntry);
136         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
137     }
138
139     @Test
140     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
141         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
142
143         MockRaftActorContext actorContext = createActorContextWithFollower();
144
145         long term = 1;
146         actorContext.getTermInformation().update(term, "");
147
148         leader = new Leader(actorContext);
149
150         // Leader will send an immediate heartbeat - ignore it.
151         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
152
153         // The follower would normally reply - simulate that explicitly here.
154         long lastIndex = actorContext.getReplicatedLog().lastIndex();
155         leader.handleMessage(followerActor, new AppendEntriesReply(
156                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
157         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
158
159         followerActor.underlyingActor().clear();
160
161         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
162
163         // State should not change
164         assertTrue(raftBehavior instanceof Leader);
165
166         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
167         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
168         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
169         assertEquals("Entries size", 1, appendEntries.getEntries().size());
170         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
171         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
172         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
173         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
174     }
175
176     @Test
177     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
178         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
179
180         MockRaftActorContext actorContext = createActorContextWithFollower();
181         actorContext.setRaftPolicy(createRaftPolicy(true, true));
182
183         long term = 1;
184         actorContext.getTermInformation().update(term, "");
185
186         leader = new Leader(actorContext);
187
188         // Leader will send an immediate heartbeat - ignore it.
189         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190
191         // The follower would normally reply - simulate that explicitly here.
192         long lastIndex = actorContext.getReplicatedLog().lastIndex();
193         leader.handleMessage(followerActor, new AppendEntriesReply(
194                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
195         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
196
197         followerActor.underlyingActor().clear();
198
199         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
200
201         // State should not change
202         assertTrue(raftBehavior instanceof Leader);
203
204         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
205         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
206         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
207         assertEquals("Entries size", 1, appendEntries.getEntries().size());
208         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
209         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
210         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
211         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
212     }
213
214     @Test
215     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
216         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
217
218         MockRaftActorContext actorContext = createActorContextWithFollower();
219         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
220             @Override
221             public FiniteDuration getHeartBeatInterval() {
222                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
223             }
224         });
225
226         long term = 1;
227         actorContext.getTermInformation().update(term, "");
228
229         leader = new Leader(actorContext);
230
231         // Leader will send an immediate heartbeat - ignore it.
232         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233
234         // The follower would normally reply - simulate that explicitly here.
235         long lastIndex = actorContext.getReplicatedLog().lastIndex();
236         leader.handleMessage(followerActor, new AppendEntriesReply(
237                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
238         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
239
240         followerActor.underlyingActor().clear();
241
242         for(int i=0;i<5;i++) {
243             sendReplicate(actorContext, lastIndex+i+1);
244         }
245
246         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
247         // We expect only 1 message to be sent because of two reasons,
248         // - an append entries reply was not received
249         // - the heartbeat interval has not expired
250         // In this scenario if multiple messages are sent they would likely be duplicates
251         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
252     }
253
254     @Test
255     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
256         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
257
258         MockRaftActorContext actorContext = createActorContextWithFollower();
259         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
260             @Override
261             public FiniteDuration getHeartBeatInterval() {
262                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
263             }
264         });
265
266         long term = 1;
267         actorContext.getTermInformation().update(term, "");
268
269         leader = new Leader(actorContext);
270
271         // Leader will send an immediate heartbeat - ignore it.
272         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
273
274         // The follower would normally reply - simulate that explicitly here.
275         long lastIndex = actorContext.getReplicatedLog().lastIndex();
276         leader.handleMessage(followerActor, new AppendEntriesReply(
277                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
278         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
279
280         followerActor.underlyingActor().clear();
281
282         for(int i=0;i<3;i++) {
283             sendReplicate(actorContext, lastIndex+i+1);
284             leader.handleMessage(followerActor, new AppendEntriesReply(
285                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
286
287         }
288
289         for(int i=3;i<5;i++) {
290             sendReplicate(actorContext, lastIndex + i + 1);
291         }
292
293         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
294         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
295         // get sent to the follower - but not the 5th
296         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
297
298         for(int i=0;i<4;i++) {
299             long expected = allMessages.get(i).getEntries().get(0).getIndex();
300             assertEquals(expected, i+2);
301         }
302     }
303
304     @Test
305     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
306         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
307
308         MockRaftActorContext actorContext = createActorContextWithFollower();
309         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
310             @Override
311             public FiniteDuration getHeartBeatInterval() {
312                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
313             }
314         });
315
316         long term = 1;
317         actorContext.getTermInformation().update(term, "");
318
319         leader = new Leader(actorContext);
320
321         // Leader will send an immediate heartbeat - ignore it.
322         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
323
324         // The follower would normally reply - simulate that explicitly here.
325         long lastIndex = actorContext.getReplicatedLog().lastIndex();
326         leader.handleMessage(followerActor, new AppendEntriesReply(
327                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
328         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
329
330         followerActor.underlyingActor().clear();
331
332         sendReplicate(actorContext, lastIndex+1);
333
334         // Wait slightly longer than heartbeat duration
335         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
336
337         leader.handleMessage(leaderActor, new SendHeartBeat());
338
339         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
340         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
341
342         assertEquals(1, allMessages.get(0).getEntries().size());
343         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
344         assertEquals(1, allMessages.get(1).getEntries().size());
345         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
346
347     }
348
349     @Test
350     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
351         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
352
353         MockRaftActorContext actorContext = createActorContextWithFollower();
354         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
355             @Override
356             public FiniteDuration getHeartBeatInterval() {
357                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
358             }
359         });
360
361         long term = 1;
362         actorContext.getTermInformation().update(term, "");
363
364         leader = new Leader(actorContext);
365
366         // Leader will send an immediate heartbeat - ignore it.
367         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
368
369         // The follower would normally reply - simulate that explicitly here.
370         long lastIndex = actorContext.getReplicatedLog().lastIndex();
371         leader.handleMessage(followerActor, new AppendEntriesReply(
372                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
373         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
374
375         followerActor.underlyingActor().clear();
376
377         for(int i=0;i<3;i++) {
378             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
379             leader.handleMessage(leaderActor, new SendHeartBeat());
380         }
381
382         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
383         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
384     }
385
386     @Test
387     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
388         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
389
390         MockRaftActorContext actorContext = createActorContextWithFollower();
391         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
392             @Override
393             public FiniteDuration getHeartBeatInterval() {
394                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
395             }
396         });
397
398         long term = 1;
399         actorContext.getTermInformation().update(term, "");
400
401         leader = new Leader(actorContext);
402
403         // Leader will send an immediate heartbeat - ignore it.
404         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
405
406         // The follower would normally reply - simulate that explicitly here.
407         long lastIndex = actorContext.getReplicatedLog().lastIndex();
408         leader.handleMessage(followerActor, new AppendEntriesReply(
409                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
410         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
411
412         followerActor.underlyingActor().clear();
413
414         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
415         leader.handleMessage(leaderActor, new SendHeartBeat());
416         sendReplicate(actorContext, lastIndex+1);
417
418         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
419         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
420
421         assertEquals(0, allMessages.get(0).getEntries().size());
422         assertEquals(1, allMessages.get(1).getEntries().size());
423     }
424
425
426     @Test
427     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
428         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
429
430         MockRaftActorContext actorContext = createActorContext();
431
432         leader = new Leader(actorContext);
433
434         actorContext.setLastApplied(0);
435
436         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
437         long term = actorContext.getTermInformation().getCurrentTerm();
438         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
439                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
440
441         actorContext.getReplicatedLog().append(newEntry);
442
443         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
444                 new Replicate(leaderActor, "state-id", newEntry));
445
446         // State should not change
447         assertTrue(raftBehavior instanceof Leader);
448
449         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
450
451         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
452         // one since lastApplied state is 0.
453         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
454                 leaderActor, ApplyState.class);
455         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
456
457         for(int i = 0; i <= newLogIndex - 1; i++ ) {
458             ApplyState applyState = applyStateList.get(i);
459             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
460             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
461         }
462
463         ApplyState last = applyStateList.get((int) newLogIndex - 1);
464         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
465         assertEquals("getIdentifier", "state-id", last.getIdentifier());
466     }
467
468     @Test
469     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
470         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
471
472         MockRaftActorContext actorContext = createActorContextWithFollower();
473
474         Map<String, String> leadersSnapshot = new HashMap<>();
475         leadersSnapshot.put("1", "A");
476         leadersSnapshot.put("2", "B");
477         leadersSnapshot.put("3", "C");
478
479         //clears leaders log
480         actorContext.getReplicatedLog().removeFrom(0);
481
482         final int commitIndex = 3;
483         final int snapshotIndex = 2;
484         final int newEntryIndex = 4;
485         final int snapshotTerm = 1;
486         final int currentTerm = 2;
487
488         // set the snapshot variables in replicatedlog
489         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
490         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
491         actorContext.setCommitIndex(commitIndex);
492         //set follower timeout to 2 mins, helps during debugging
493         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
494
495         leader = new Leader(actorContext);
496
497         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
498         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
499
500         // new entry
501         ReplicatedLogImplEntry entry =
502                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
503                         new MockRaftActorContext.MockPayload("D"));
504
505         //update follower timestamp
506         leader.markFollowerActive(FOLLOWER_ID);
507
508         ByteString bs = toByteString(leadersSnapshot);
509         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
510                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
511         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
512         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
513
514         //send first chunk and no InstallSnapshotReply received yet
515         fts.getNextChunk();
516         fts.incrementChunkIndex();
517
518         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
519                 TimeUnit.MILLISECONDS);
520
521         leader.handleMessage(leaderActor, new SendHeartBeat());
522
523         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
524
525         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
526
527         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
528
529         //InstallSnapshotReply received
530         fts.markSendStatus(true);
531
532         leader.handleMessage(leaderActor, new SendHeartBeat());
533
534         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
535
536         assertEquals(commitIndex, is.getLastIncludedIndex());
537     }
538
539     @Test
540     public void testSendAppendEntriesSnapshotScenario() throws Exception {
541         logStart("testSendAppendEntriesSnapshotScenario");
542
543         MockRaftActorContext actorContext = createActorContextWithFollower();
544
545         Map<String, String> leadersSnapshot = new HashMap<>();
546         leadersSnapshot.put("1", "A");
547         leadersSnapshot.put("2", "B");
548         leadersSnapshot.put("3", "C");
549
550         //clears leaders log
551         actorContext.getReplicatedLog().removeFrom(0);
552
553         final int followersLastIndex = 2;
554         final int snapshotIndex = 3;
555         final int newEntryIndex = 4;
556         final int snapshotTerm = 1;
557         final int currentTerm = 2;
558
559         // set the snapshot variables in replicatedlog
560         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
561         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
562         actorContext.setCommitIndex(followersLastIndex);
563
564         leader = new Leader(actorContext);
565
566         // Leader will send an immediate heartbeat - ignore it.
567         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
568
569         // new entry
570         ReplicatedLogImplEntry entry =
571                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
572                         new MockRaftActorContext.MockPayload("D"));
573
574         actorContext.getReplicatedLog().append(entry);
575
576         //update follower timestamp
577         leader.markFollowerActive(FOLLOWER_ID);
578
579         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
580         RaftActorBehavior raftBehavior = leader.handleMessage(
581                 leaderActor, new Replicate(null, "state-id", entry));
582
583         assertTrue(raftBehavior instanceof Leader);
584
585         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
586     }
587
588     @Test
589     public void testInitiateInstallSnapshot() throws Exception {
590         logStart("testInitiateInstallSnapshot");
591
592         MockRaftActorContext actorContext = createActorContextWithFollower();
593
594         Map<String, String> leadersSnapshot = new HashMap<>();
595         leadersSnapshot.put("1", "A");
596         leadersSnapshot.put("2", "B");
597         leadersSnapshot.put("3", "C");
598
599         //clears leaders log
600         actorContext.getReplicatedLog().removeFrom(0);
601
602         final int followersLastIndex = 2;
603         final int snapshotIndex = 3;
604         final int newEntryIndex = 4;
605         final int snapshotTerm = 1;
606         final int currentTerm = 2;
607
608         // set the snapshot variables in replicatedlog
609         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
610         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
611         actorContext.setLastApplied(3);
612         actorContext.setCommitIndex(followersLastIndex);
613
614         leader = new Leader(actorContext);
615
616         // Leader will send an immediate heartbeat - ignore it.
617         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
618
619         // set the snapshot as absent and check if capture-snapshot is invoked.
620         leader.setSnapshot(null);
621
622         // new entry
623         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
624                 new MockRaftActorContext.MockPayload("D"));
625
626         actorContext.getReplicatedLog().append(entry);
627
628         //update follower timestamp
629         leader.markFollowerActive(FOLLOWER_ID);
630
631         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
632
633         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
634
635         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
636
637         assertTrue(cs.isInstallSnapshotInitiated());
638         assertEquals(3, cs.getLastAppliedIndex());
639         assertEquals(1, cs.getLastAppliedTerm());
640         assertEquals(4, cs.getLastIndex());
641         assertEquals(2, cs.getLastTerm());
642
643         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
644         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
645
646         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
647     }
648
649     @Test
650     public void testInstallSnapshot() throws Exception {
651         logStart("testInstallSnapshot");
652
653         MockRaftActorContext actorContext = createActorContextWithFollower();
654
655         Map<String, String> leadersSnapshot = new HashMap<>();
656         leadersSnapshot.put("1", "A");
657         leadersSnapshot.put("2", "B");
658         leadersSnapshot.put("3", "C");
659
660         //clears leaders log
661         actorContext.getReplicatedLog().removeFrom(0);
662
663         final int lastAppliedIndex = 3;
664         final int snapshotIndex = 2;
665         final int snapshotTerm = 1;
666         final int currentTerm = 2;
667
668         // set the snapshot variables in replicatedlog
669         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
670         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
671         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
672         actorContext.setCommitIndex(lastAppliedIndex);
673         actorContext.setLastApplied(lastAppliedIndex);
674
675         leader = new Leader(actorContext);
676
677         // Initial heartbeat.
678         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
679
680         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
681         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
682
683         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
684                 Collections.<ReplicatedLogEntry>emptyList(),
685                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
686
687         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
688
689         assertTrue(raftBehavior instanceof Leader);
690
691         // check if installsnapshot gets called with the correct values.
692
693         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
694
695         assertNotNull(installSnapshot.getData());
696         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
697         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
698
699         assertEquals(currentTerm, installSnapshot.getTerm());
700     }
701
702     @Test
703     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
704         logStart("testHandleInstallSnapshotReplyLastChunk");
705
706         MockRaftActorContext actorContext = createActorContextWithFollower();
707
708         final int commitIndex = 3;
709         final int snapshotIndex = 2;
710         final int snapshotTerm = 1;
711         final int currentTerm = 2;
712
713         actorContext.setCommitIndex(commitIndex);
714
715         leader = new Leader(actorContext);
716
717         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
718         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
719
720         // Ignore initial heartbeat.
721         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
722
723         Map<String, String> leadersSnapshot = new HashMap<>();
724         leadersSnapshot.put("1", "A");
725         leadersSnapshot.put("2", "B");
726         leadersSnapshot.put("3", "C");
727
728         // set the snapshot variables in replicatedlog
729
730         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
731         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
732         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
733
734         ByteString bs = toByteString(leadersSnapshot);
735         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
736                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
737         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
738         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
739         while(!fts.isLastChunk(fts.getChunkIndex())) {
740             fts.getNextChunk();
741             fts.incrementChunkIndex();
742         }
743
744         //clears leaders log
745         actorContext.getReplicatedLog().removeFrom(0);
746
747         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
748                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
749
750         assertTrue(raftBehavior instanceof Leader);
751
752         assertEquals(0, leader.followerSnapshotSize());
753         assertEquals(1, leader.followerLogSize());
754         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
755         assertNotNull(fli);
756         assertEquals(commitIndex, fli.getMatchIndex());
757         assertEquals(commitIndex + 1, fli.getNextIndex());
758     }
759
760     @Test
761     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
762         logStart("testSendSnapshotfromInstallSnapshotReply");
763
764         MockRaftActorContext actorContext = createActorContextWithFollower();
765
766         final int commitIndex = 3;
767         final int snapshotIndex = 2;
768         final int snapshotTerm = 1;
769         final int currentTerm = 2;
770
771         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
772             @Override
773             public int getSnapshotChunkSize() {
774                 return 50;
775             }
776         };
777         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
778         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
779
780         actorContext.setConfigParams(configParams);
781         actorContext.setCommitIndex(commitIndex);
782
783         leader = new Leader(actorContext);
784
785         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
786         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
787
788         Map<String, String> leadersSnapshot = new HashMap<>();
789         leadersSnapshot.put("1", "A");
790         leadersSnapshot.put("2", "B");
791         leadersSnapshot.put("3", "C");
792
793         // set the snapshot variables in replicatedlog
794         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
795         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
796         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
797
798         ByteString bs = toByteString(leadersSnapshot);
799         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
800                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
801         leader.setSnapshot(snapshot);
802
803         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
804
805         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
806
807         assertEquals(1, installSnapshot.getChunkIndex());
808         assertEquals(3, installSnapshot.getTotalChunks());
809
810         followerActor.underlyingActor().clear();
811         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
812                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
813
814         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
815
816         assertEquals(2, installSnapshot.getChunkIndex());
817         assertEquals(3, installSnapshot.getTotalChunks());
818
819         followerActor.underlyingActor().clear();
820         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
821                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
822
823         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
824
825         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
826         followerActor.underlyingActor().clear();
827         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
828                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
829
830         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
831
832         Assert.assertNull(installSnapshot);
833     }
834
835
836     @Test
837     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
838         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
839
840         MockRaftActorContext actorContext = createActorContextWithFollower();
841
842         final int commitIndex = 3;
843         final int snapshotIndex = 2;
844         final int snapshotTerm = 1;
845         final int currentTerm = 2;
846
847         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
848             @Override
849             public int getSnapshotChunkSize() {
850                 return 50;
851             }
852         });
853
854         actorContext.setCommitIndex(commitIndex);
855
856         leader = new Leader(actorContext);
857
858         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
859         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
860
861         Map<String, String> leadersSnapshot = new HashMap<>();
862         leadersSnapshot.put("1", "A");
863         leadersSnapshot.put("2", "B");
864         leadersSnapshot.put("3", "C");
865
866         // set the snapshot variables in replicatedlog
867         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
868         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
869         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
870
871         ByteString bs = toByteString(leadersSnapshot);
872         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
873                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
874         leader.setSnapshot(snapshot);
875
876         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
877         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
878
879         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
880
881         assertEquals(1, installSnapshot.getChunkIndex());
882         assertEquals(3, installSnapshot.getTotalChunks());
883
884         followerActor.underlyingActor().clear();
885
886         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
887                 FOLLOWER_ID, -1, false));
888
889         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
890                 TimeUnit.MILLISECONDS);
891
892         leader.handleMessage(leaderActor, new SendHeartBeat());
893
894         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
895
896         assertEquals(1, installSnapshot.getChunkIndex());
897         assertEquals(3, installSnapshot.getTotalChunks());
898     }
899
900     @Test
901     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
902         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
903
904         MockRaftActorContext actorContext = createActorContextWithFollower();
905
906         final int commitIndex = 3;
907         final int snapshotIndex = 2;
908         final int snapshotTerm = 1;
909         final int currentTerm = 2;
910
911         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
912             @Override
913             public int getSnapshotChunkSize() {
914                 return 50;
915             }
916         });
917
918         actorContext.setCommitIndex(commitIndex);
919
920         leader = new Leader(actorContext);
921
922         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
923         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
924
925         Map<String, String> leadersSnapshot = new HashMap<>();
926         leadersSnapshot.put("1", "A");
927         leadersSnapshot.put("2", "B");
928         leadersSnapshot.put("3", "C");
929
930         // set the snapshot variables in replicatedlog
931         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
932         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
933         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
934
935         ByteString bs = toByteString(leadersSnapshot);
936         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
937                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
938         leader.setSnapshot(snapshot);
939
940         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
941
942         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
943
944         assertEquals(1, installSnapshot.getChunkIndex());
945         assertEquals(3, installSnapshot.getTotalChunks());
946         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
947
948         int hashCode = installSnapshot.getData().hashCode();
949
950         followerActor.underlyingActor().clear();
951
952         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
953                 FOLLOWER_ID, 1, true));
954
955         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
956
957         assertEquals(2, installSnapshot.getChunkIndex());
958         assertEquals(3, installSnapshot.getTotalChunks());
959         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
960     }
961
962     @Test
963     public void testFollowerToSnapshotLogic() {
964         logStart("testFollowerToSnapshotLogic");
965
966         MockRaftActorContext actorContext = createActorContext();
967
968         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
969             @Override
970             public int getSnapshotChunkSize() {
971                 return 50;
972             }
973         });
974
975         leader = new Leader(actorContext);
976
977         Map<String, String> leadersSnapshot = new HashMap<>();
978         leadersSnapshot.put("1", "A");
979         leadersSnapshot.put("2", "B");
980         leadersSnapshot.put("3", "C");
981
982         ByteString bs = toByteString(leadersSnapshot);
983         byte[] barray = bs.toByteArray();
984
985         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
986         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
987
988         assertEquals(bs.size(), barray.length);
989
990         int chunkIndex=0;
991         for (int i=0; i < barray.length; i = i + 50) {
992             int j = i + 50;
993             chunkIndex++;
994
995             if (i + 50 > barray.length) {
996                 j = barray.length;
997             }
998
999             ByteString chunk = fts.getNextChunk();
1000             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1001             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1002
1003             fts.markSendStatus(true);
1004             if (!fts.isLastChunk(chunkIndex)) {
1005                 fts.incrementChunkIndex();
1006             }
1007         }
1008
1009         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1010     }
1011
1012     @Override protected RaftActorBehavior createBehavior(
1013         RaftActorContext actorContext) {
1014         return new Leader(actorContext);
1015     }
1016
1017     @Override
1018     protected MockRaftActorContext createActorContext() {
1019         return createActorContext(leaderActor);
1020     }
1021
1022     @Override
1023     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1024         return createActorContext(LEADER_ID, actorRef);
1025     }
1026
1027     private MockRaftActorContext createActorContextWithFollower() {
1028         MockRaftActorContext actorContext = createActorContext();
1029         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1030                 followerActor.path().toString()).build());
1031         return actorContext;
1032     }
1033
1034     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1035         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1036         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1037         configParams.setElectionTimeoutFactor(100000);
1038         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1039         context.setConfigParams(configParams);
1040         context.setPayloadVersion(payloadVersion);
1041         return context;
1042     }
1043
1044     private MockRaftActorContext createFollowerActorContextWithLeader() {
1045         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1046         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1047         followerConfig.setElectionTimeoutFactor(10000);
1048         followerActorContext.setConfigParams(followerConfig);
1049         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1050         return followerActorContext;
1051     }
1052
1053     @Test
1054     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1055         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1056
1057         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1058
1059         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1060
1061         Follower follower = new Follower(followerActorContext);
1062         followerActor.underlyingActor().setBehavior(follower);
1063
1064         Map<String, String> peerAddresses = new HashMap<>();
1065         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1066
1067         leaderActorContext.setPeerAddresses(peerAddresses);
1068
1069         leaderActorContext.getReplicatedLog().removeFrom(0);
1070
1071         //create 3 entries
1072         leaderActorContext.setReplicatedLog(
1073                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1074
1075         leaderActorContext.setCommitIndex(1);
1076
1077         followerActorContext.getReplicatedLog().removeFrom(0);
1078
1079         // follower too has the exact same log entries and has the same commit index
1080         followerActorContext.setReplicatedLog(
1081                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1082
1083         followerActorContext.setCommitIndex(1);
1084
1085         leader = new Leader(leaderActorContext);
1086
1087         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1088
1089         assertEquals(1, appendEntries.getLeaderCommit());
1090         assertEquals(0, appendEntries.getEntries().size());
1091         assertEquals(0, appendEntries.getPrevLogIndex());
1092
1093         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1094                 leaderActor, AppendEntriesReply.class);
1095
1096         assertEquals(2, appendEntriesReply.getLogLastIndex());
1097         assertEquals(1, appendEntriesReply.getLogLastTerm());
1098
1099         // follower returns its next index
1100         assertEquals(2, appendEntriesReply.getLogLastIndex());
1101         assertEquals(1, appendEntriesReply.getLogLastTerm());
1102
1103         follower.close();
1104     }
1105
1106     @Test
1107     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1108         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1109
1110         MockRaftActorContext leaderActorContext = createActorContext();
1111
1112         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1113         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1114
1115         Follower follower = new Follower(followerActorContext);
1116         followerActor.underlyingActor().setBehavior(follower);
1117
1118         Map<String, String> leaderPeerAddresses = new HashMap<>();
1119         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1120
1121         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1122
1123         leaderActorContext.getReplicatedLog().removeFrom(0);
1124
1125         leaderActorContext.setReplicatedLog(
1126                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1127
1128         leaderActorContext.setCommitIndex(1);
1129
1130         followerActorContext.getReplicatedLog().removeFrom(0);
1131
1132         followerActorContext.setReplicatedLog(
1133                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1134
1135         // follower has the same log entries but its commit index > leaders commit index
1136         followerActorContext.setCommitIndex(2);
1137
1138         leader = new Leader(leaderActorContext);
1139
1140         // Initial heartbeat
1141         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1142
1143         assertEquals(1, appendEntries.getLeaderCommit());
1144         assertEquals(0, appendEntries.getEntries().size());
1145         assertEquals(0, appendEntries.getPrevLogIndex());
1146
1147         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1148                 leaderActor, AppendEntriesReply.class);
1149
1150         assertEquals(2, appendEntriesReply.getLogLastIndex());
1151         assertEquals(1, appendEntriesReply.getLogLastTerm());
1152
1153         leaderActor.underlyingActor().setBehavior(follower);
1154         leader.handleMessage(followerActor, appendEntriesReply);
1155
1156         leaderActor.underlyingActor().clear();
1157         followerActor.underlyingActor().clear();
1158
1159         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1160                 TimeUnit.MILLISECONDS);
1161
1162         leader.handleMessage(leaderActor, new SendHeartBeat());
1163
1164         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1165
1166         assertEquals(2, appendEntries.getLeaderCommit());
1167         assertEquals(0, appendEntries.getEntries().size());
1168         assertEquals(2, appendEntries.getPrevLogIndex());
1169
1170         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1171
1172         assertEquals(2, appendEntriesReply.getLogLastIndex());
1173         assertEquals(1, appendEntriesReply.getLogLastTerm());
1174
1175         assertEquals(2, followerActorContext.getCommitIndex());
1176
1177         follower.close();
1178     }
1179
1180     @Test
1181     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1182         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1183
1184         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1185         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1186                 new FiniteDuration(1000, TimeUnit.SECONDS));
1187
1188         leaderActorContext.setReplicatedLog(
1189                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1190         long leaderCommitIndex = 2;
1191         leaderActorContext.setCommitIndex(leaderCommitIndex);
1192         leaderActorContext.setLastApplied(leaderCommitIndex);
1193
1194         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1195         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1196
1197         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1198
1199         followerActorContext.setReplicatedLog(
1200                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1201         followerActorContext.setCommitIndex(0);
1202         followerActorContext.setLastApplied(0);
1203
1204         Follower follower = new Follower(followerActorContext);
1205         followerActor.underlyingActor().setBehavior(follower);
1206
1207         leader = new Leader(leaderActorContext);
1208
1209         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1210         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1211
1212         MessageCollectorActor.clearMessages(followerActor);
1213         MessageCollectorActor.clearMessages(leaderActor);
1214
1215         // Verify initial AppendEntries sent with the leader's current commit index.
1216         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1217         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1218         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1219
1220         leaderActor.underlyingActor().setBehavior(leader);
1221
1222         leader.handleMessage(followerActor, appendEntriesReply);
1223
1224         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1225         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1226
1227         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1228         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1229         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1230
1231         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1232         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1233                 appendEntries.getEntries().get(0).getData());
1234         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1235         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1236                 appendEntries.getEntries().get(1).getData());
1237
1238         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1239         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1240
1241         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1242
1243         ApplyState applyState = applyStateList.get(0);
1244         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1245         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1246         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1247                 applyState.getReplicatedLogEntry().getData());
1248
1249         applyState = applyStateList.get(1);
1250         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1251         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1252         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1253                 applyState.getReplicatedLogEntry().getData());
1254
1255         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1256         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1257     }
1258
1259     @Test
1260     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1261         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1262
1263         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1264         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1265                 new FiniteDuration(1000, TimeUnit.SECONDS));
1266
1267         leaderActorContext.setReplicatedLog(
1268                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1269         long leaderCommitIndex = 1;
1270         leaderActorContext.setCommitIndex(leaderCommitIndex);
1271         leaderActorContext.setLastApplied(leaderCommitIndex);
1272
1273         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1274         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1275
1276         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1277
1278         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1279         followerActorContext.setCommitIndex(-1);
1280         followerActorContext.setLastApplied(-1);
1281
1282         Follower follower = new Follower(followerActorContext);
1283         followerActor.underlyingActor().setBehavior(follower);
1284
1285         leader = new Leader(leaderActorContext);
1286
1287         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1288         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1289
1290         MessageCollectorActor.clearMessages(followerActor);
1291         MessageCollectorActor.clearMessages(leaderActor);
1292
1293         // Verify initial AppendEntries sent with the leader's current commit index.
1294         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1295         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1296         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1297
1298         leaderActor.underlyingActor().setBehavior(leader);
1299
1300         leader.handleMessage(followerActor, appendEntriesReply);
1301
1302         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1303         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1304
1305         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1306         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1307         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1308
1309         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1310         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1311                 appendEntries.getEntries().get(0).getData());
1312         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1313         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1314                 appendEntries.getEntries().get(1).getData());
1315
1316         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1317         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1318
1319         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1320
1321         ApplyState applyState = applyStateList.get(0);
1322         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1323         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1324         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1325                 applyState.getReplicatedLogEntry().getData());
1326
1327         applyState = applyStateList.get(1);
1328         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1329         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1330         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1331                 applyState.getReplicatedLogEntry().getData());
1332
1333         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1334         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1335     }
1336
1337     @Test
1338     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1339         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1340
1341         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1342         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1343                 new FiniteDuration(1000, TimeUnit.SECONDS));
1344
1345         leaderActorContext.setReplicatedLog(
1346                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1347         long leaderCommitIndex = 1;
1348         leaderActorContext.setCommitIndex(leaderCommitIndex);
1349         leaderActorContext.setLastApplied(leaderCommitIndex);
1350
1351         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1352         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1353
1354         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1355
1356         followerActorContext.setReplicatedLog(
1357                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1358         followerActorContext.setCommitIndex(-1);
1359         followerActorContext.setLastApplied(-1);
1360
1361         Follower follower = new Follower(followerActorContext);
1362         followerActor.underlyingActor().setBehavior(follower);
1363
1364         leader = new Leader(leaderActorContext);
1365
1366         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1367         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1368
1369         MessageCollectorActor.clearMessages(followerActor);
1370         MessageCollectorActor.clearMessages(leaderActor);
1371
1372         // Verify initial AppendEntries sent with the leader's current commit index.
1373         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1374         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1375         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1376
1377         leaderActor.underlyingActor().setBehavior(leader);
1378
1379         leader.handleMessage(followerActor, appendEntriesReply);
1380
1381         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1382         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1383
1384         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1385         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1386         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1387
1388         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1389         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1390         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1391                 appendEntries.getEntries().get(0).getData());
1392         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1393         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1394         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1395                 appendEntries.getEntries().get(1).getData());
1396
1397         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1398         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1399
1400         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1401
1402         ApplyState applyState = applyStateList.get(0);
1403         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1404         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1405         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1406                 applyState.getReplicatedLogEntry().getData());
1407
1408         applyState = applyStateList.get(1);
1409         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1410         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1411         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1412                 applyState.getReplicatedLogEntry().getData());
1413
1414         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1415         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1416         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1417     }
1418
1419     @Test
1420     public void testHandleAppendEntriesReplySuccess() throws Exception {
1421         logStart("testHandleAppendEntriesReplySuccess");
1422
1423         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1424
1425         leaderActorContext.setReplicatedLog(
1426                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1427
1428         leaderActorContext.setCommitIndex(1);
1429         leaderActorContext.setLastApplied(1);
1430         leaderActorContext.getTermInformation().update(1, "leader");
1431
1432         leader = new Leader(leaderActorContext);
1433
1434         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1435
1436         short payloadVersion = 5;
1437         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1438
1439         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1440
1441         assertEquals(RaftState.Leader, raftActorBehavior.state());
1442
1443         assertEquals(2, leaderActorContext.getCommitIndex());
1444
1445         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1446                 leaderActor, ApplyJournalEntries.class);
1447
1448         assertEquals(2, leaderActorContext.getLastApplied());
1449
1450         assertEquals(2, applyJournalEntries.getToIndex());
1451
1452         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1453                 ApplyState.class);
1454
1455         assertEquals(1,applyStateList.size());
1456
1457         ApplyState applyState = applyStateList.get(0);
1458
1459         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1460
1461         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1462         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1463     }
1464
1465     @Test
1466     public void testHandleAppendEntriesReplyUnknownFollower(){
1467         logStart("testHandleAppendEntriesReplyUnknownFollower");
1468
1469         MockRaftActorContext leaderActorContext = createActorContext();
1470
1471         leader = new Leader(leaderActorContext);
1472
1473         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1474
1475         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1476
1477         assertEquals(RaftState.Leader, raftActorBehavior.state());
1478     }
1479
1480     @Test
1481     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1482         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1483
1484         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1485         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1486                 new FiniteDuration(1000, TimeUnit.SECONDS));
1487         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1488
1489         leaderActorContext.setReplicatedLog(
1490                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1491         long leaderCommitIndex = 3;
1492         leaderActorContext.setCommitIndex(leaderCommitIndex);
1493         leaderActorContext.setLastApplied(leaderCommitIndex);
1494
1495         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1496         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1497         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1498         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1499
1500         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1501
1502         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1503         followerActorContext.setCommitIndex(-1);
1504         followerActorContext.setLastApplied(-1);
1505
1506         Follower follower = new Follower(followerActorContext);
1507         followerActor.underlyingActor().setBehavior(follower);
1508
1509         leader = new Leader(leaderActorContext);
1510
1511         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1512         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1513
1514         MessageCollectorActor.clearMessages(followerActor);
1515         MessageCollectorActor.clearMessages(leaderActor);
1516
1517         // Verify initial AppendEntries sent with the leader's current commit index.
1518         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1519         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1520         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1521
1522         leaderActor.underlyingActor().setBehavior(leader);
1523
1524         leader.handleMessage(followerActor, appendEntriesReply);
1525
1526         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1527         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1528
1529         appendEntries = appendEntriesList.get(0);
1530         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1531         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1532         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1533
1534         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1535         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1536                 appendEntries.getEntries().get(0).getData());
1537         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1538         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1539                 appendEntries.getEntries().get(1).getData());
1540
1541         appendEntries = appendEntriesList.get(1);
1542         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1543         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1544         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1545
1546         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1547         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1548                 appendEntries.getEntries().get(0).getData());
1549         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1550         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1551                 appendEntries.getEntries().get(1).getData());
1552
1553         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1554         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1555
1556         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1557
1558         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1559         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1560     }
1561
1562     @Test
1563     public void testHandleRequestVoteReply(){
1564         logStart("testHandleRequestVoteReply");
1565
1566         MockRaftActorContext leaderActorContext = createActorContext();
1567
1568         leader = new Leader(leaderActorContext);
1569
1570         // Should be a no-op.
1571         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1572                 new RequestVoteReply(1, true));
1573
1574         assertEquals(RaftState.Leader, raftActorBehavior.state());
1575
1576         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1577
1578         assertEquals(RaftState.Leader, raftActorBehavior.state());
1579     }
1580
1581     @Test
1582     public void testIsolatedLeaderCheckNoFollowers() {
1583         logStart("testIsolatedLeaderCheckNoFollowers");
1584
1585         MockRaftActorContext leaderActorContext = createActorContext();
1586
1587         leader = new Leader(leaderActorContext);
1588         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1589         Assert.assertTrue(behavior instanceof Leader);
1590     }
1591
1592     @Test
1593     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1594         logStart("testIsolatedLeaderCheckTwoFollowers");
1595
1596         new JavaTestKit(getSystem()) {{
1597
1598             ActorRef followerActor1 = getTestActor();
1599             ActorRef followerActor2 = getTestActor();
1600
1601             MockRaftActorContext leaderActorContext = createActorContext();
1602
1603             Map<String, String> peerAddresses = new HashMap<>();
1604             peerAddresses.put("follower-1", followerActor1.path().toString());
1605             peerAddresses.put("follower-2", followerActor2.path().toString());
1606
1607             leaderActorContext.setPeerAddresses(peerAddresses);
1608
1609             leader = new Leader(leaderActorContext);
1610
1611             leader.markFollowerActive("follower-1");
1612             leader.markFollowerActive("follower-2");
1613             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1614             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1615                 behavior instanceof Leader);
1616
1617             // kill 1 follower and verify if that got killed
1618             final JavaTestKit probe = new JavaTestKit(getSystem());
1619             probe.watch(followerActor1);
1620             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1621             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1622             assertEquals(termMsg1.getActor(), followerActor1);
1623
1624             leader.markFollowerInActive("follower-1");
1625             leader.markFollowerActive("follower-2");
1626             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1627             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1628                 behavior instanceof Leader);
1629
1630             // kill 2nd follower and leader should change to Isolated leader
1631             followerActor2.tell(PoisonPill.getInstance(), null);
1632             probe.watch(followerActor2);
1633             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1634             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1635             assertEquals(termMsg2.getActor(), followerActor2);
1636
1637             leader.markFollowerInActive("follower-2");
1638             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1639             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1640                 behavior instanceof IsolatedLeader);
1641         }};
1642     }
1643
1644     @Test
1645     public void testLaggingFollowerStarvation() throws Exception {
1646         logStart("testLaggingFollowerStarvation");
1647         new JavaTestKit(getSystem()) {{
1648             String leaderActorId = actorFactory.generateActorId("leader");
1649             String follower1ActorId = actorFactory.generateActorId("follower");
1650             String follower2ActorId = actorFactory.generateActorId("follower");
1651
1652             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1653                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1654             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1655             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1656
1657             MockRaftActorContext leaderActorContext =
1658                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1659
1660             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1661             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1662             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1663
1664             leaderActorContext.setConfigParams(configParams);
1665
1666             leaderActorContext.setReplicatedLog(
1667                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1668
1669             Map<String, String> peerAddresses = new HashMap<>();
1670             peerAddresses.put(follower1ActorId,
1671                     follower1Actor.path().toString());
1672             peerAddresses.put(follower2ActorId,
1673                     follower2Actor.path().toString());
1674
1675             leaderActorContext.setPeerAddresses(peerAddresses);
1676             leaderActorContext.getTermInformation().update(1, leaderActorId);
1677
1678             RaftActorBehavior leader = createBehavior(leaderActorContext);
1679
1680             leaderActor.underlyingActor().setBehavior(leader);
1681
1682             for(int i=1;i<6;i++) {
1683                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1684                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1685                 assertTrue(newBehavior == leader);
1686                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1687             }
1688
1689             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1690             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1691
1692             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1693                     heartbeats.size() > 1);
1694
1695             // Check if follower-2 got AppendEntries during this time and was not starved
1696             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1697
1698             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1699                     appendEntries.size() > 1);
1700
1701         }};
1702     }
1703
1704     @Override
1705     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1706             ActorRef actorRef, RaftRPC rpc) throws Exception {
1707         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1708         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1709     }
1710
1711     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1712
1713         private final long electionTimeOutIntervalMillis;
1714         private final int snapshotChunkSize;
1715
1716         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1717             super();
1718             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1719             this.snapshotChunkSize = snapshotChunkSize;
1720         }
1721
1722         @Override
1723         public FiniteDuration getElectionTimeOutInterval() {
1724             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1725         }
1726
1727         @Override
1728         public int getSnapshotChunkSize() {
1729             return snapshotChunkSize;
1730         }
1731     }
1732 }