c99c85fbce715141e824b5b67f33ba419b2f3991
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import org.opendaylight.yangtools.util.StringIdentifier;
68 import scala.concurrent.duration.FiniteDuration;
69
70 public class LeaderTest extends AbstractLeaderTest<Leader> {
71
72     static final String FOLLOWER_ID = "follower";
73     public static final String LEADER_ID = "leader";
74
75     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
76             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
77
78     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
79             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
80
81     private Leader leader;
82     private final short payloadVersion = 5;
83
84     @Override
85     @After
86     public void tearDown() throws Exception {
87         if(leader != null) {
88             leader.close();
89         }
90
91         super.tearDown();
92     }
93
94     @Test
95     public void testHandleMessageForUnknownMessage() throws Exception {
96         logStart("testHandleMessageForUnknownMessage");
97
98         leader = new Leader(createActorContext());
99
100         // handle message should null when it receives an unknown message
101         assertNull(leader.handleMessage(followerActor, "foo"));
102     }
103
104     @Test
105     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107
108         MockRaftActorContext actorContext = createActorContextWithFollower();
109         short payloadVersion = (short)5;
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116         actorContext.setCurrentBehavior(leader);
117
118         // Leader should send an immediate heartbeat with no entries as follower is inactive.
119         long lastIndex = actorContext.getReplicatedLog().lastIndex();
120         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getTerm", term, appendEntries.getTerm());
122         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 0, appendEntries.getEntries().size());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126
127         // The follower would normally reply - simulate that explicitly here.
128         leader.handleMessage(followerActor, new AppendEntriesReply(
129                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131
132         followerActor.underlyingActor().clear();
133
134         // Sleep for the heartbeat interval so AppendEntries is sent.
135         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137
138         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139
140         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143         assertEquals("Entries size", 1, appendEntries.getEntries().size());
144         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147     }
148
149
150     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151         return sendReplicate(actorContext, 1, index);
152     }
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157                 term, index, payload);
158         actorContext.getReplicatedLog().append(newEntry);
159         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
160     }
161
162     @Test
163     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165
166         MockRaftActorContext actorContext = createActorContextWithFollower();
167
168         long term = 1;
169         actorContext.getTermInformation().update(term, "");
170
171         leader = new Leader(actorContext);
172
173         // Leader will send an immediate heartbeat - ignore it.
174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175
176         // The follower would normally reply - simulate that explicitly here.
177         long lastIndex = actorContext.getReplicatedLog().lastIndex();
178         leader.handleMessage(followerActor, new AppendEntriesReply(
179                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181
182         followerActor.underlyingActor().clear();
183
184         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185
186         // State should not change
187         assertTrue(raftBehavior instanceof Leader);
188
189         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192         assertEquals("Entries size", 1, appendEntries.getEntries().size());
193         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
197     }
198
199     @Test
200     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202
203         MockRaftActorContext actorContext = createActorContextWithFollower();
204
205         // The raft context is initialized with a couple log entries. However the commitIndex
206         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
207         // committed and applied. Now it regains leadership with a higher term (2).
208         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
209         long newTerm = prevTerm + 1;
210         actorContext.getTermInformation().update(newTerm, "");
211
212         leader = new Leader(actorContext);
213         actorContext.setCurrentBehavior(leader);
214
215         // Leader will send an immediate heartbeat - ignore it.
216         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
217
218         // The follower replies with the leader's current last index and term, simulating that it is
219         // up to date with the leader.
220         long lastIndex = actorContext.getReplicatedLog().lastIndex();
221         leader.handleMessage(followerActor, new AppendEntriesReply(
222                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
223
224         // The commit index should not get updated even though consensus was reached. This is b/c the
225         // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
226         // from previous terms by counting replicas".
227         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
228
229         followerActor.underlyingActor().clear();
230
231         // Now replicate a new entry with the new term 2.
232         long newIndex = lastIndex + 1;
233         sendReplicate(actorContext, newTerm, newIndex);
234
235         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
237         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
238         assertEquals("Entries size", 1, appendEntries.getEntries().size());
239         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
240         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
241         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
242
243         // The follower replies with success. The leader should now update the commit index to the new index
244         // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
245         // prior entries are committed indirectly".
246         leader.handleMessage(followerActor, new AppendEntriesReply(
247                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
248
249         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
250     }
251
252     @Test
253     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
254         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
255
256         MockRaftActorContext actorContext = createActorContextWithFollower();
257         actorContext.setRaftPolicy(createRaftPolicy(true, true));
258
259         long term = 1;
260         actorContext.getTermInformation().update(term, "");
261
262         leader = new Leader(actorContext);
263
264         // Leader will send an immediate heartbeat - ignore it.
265         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
266
267         // The follower would normally reply - simulate that explicitly here.
268         long lastIndex = actorContext.getReplicatedLog().lastIndex();
269         leader.handleMessage(followerActor, new AppendEntriesReply(
270                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
271         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
272
273         followerActor.underlyingActor().clear();
274
275         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
276
277         // State should not change
278         assertTrue(raftBehavior instanceof Leader);
279
280         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
282         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
283         assertEquals("Entries size", 1, appendEntries.getEntries().size());
284         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
285         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
286         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
287         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
288     }
289
290     @Test
291     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
292         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
293
294         MockRaftActorContext actorContext = createActorContextWithFollower();
295         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
296             @Override
297             public FiniteDuration getHeartBeatInterval() {
298                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
299             }
300         });
301
302         long term = 1;
303         actorContext.getTermInformation().update(term, "");
304
305         leader = new Leader(actorContext);
306
307         // Leader will send an immediate heartbeat - ignore it.
308         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
309
310         // The follower would normally reply - simulate that explicitly here.
311         long lastIndex = actorContext.getReplicatedLog().lastIndex();
312         leader.handleMessage(followerActor, new AppendEntriesReply(
313                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
314         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
315
316         followerActor.underlyingActor().clear();
317
318         for(int i=0;i<5;i++) {
319             sendReplicate(actorContext, lastIndex+i+1);
320         }
321
322         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
323         // We expect only 1 message to be sent because of two reasons,
324         // - an append entries reply was not received
325         // - the heartbeat interval has not expired
326         // In this scenario if multiple messages are sent they would likely be duplicates
327         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
328     }
329
330     @Test
331     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
332         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
333
334         MockRaftActorContext actorContext = createActorContextWithFollower();
335         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
336             @Override
337             public FiniteDuration getHeartBeatInterval() {
338                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
339             }
340         });
341
342         long term = 1;
343         actorContext.getTermInformation().update(term, "");
344
345         leader = new Leader(actorContext);
346
347         // Leader will send an immediate heartbeat - ignore it.
348         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
349
350         // The follower would normally reply - simulate that explicitly here.
351         long lastIndex = actorContext.getReplicatedLog().lastIndex();
352         leader.handleMessage(followerActor, new AppendEntriesReply(
353                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
354         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
355
356         followerActor.underlyingActor().clear();
357
358         for(int i=0;i<3;i++) {
359             sendReplicate(actorContext, lastIndex+i+1);
360             leader.handleMessage(followerActor, new AppendEntriesReply(
361                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
362
363         }
364
365         for(int i=3;i<5;i++) {
366             sendReplicate(actorContext, lastIndex + i + 1);
367         }
368
369         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
370         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
371         // get sent to the follower - but not the 5th
372         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
373
374         for(int i=0;i<4;i++) {
375             long expected = allMessages.get(i).getEntries().get(0).getIndex();
376             assertEquals(expected, i+2);
377         }
378     }
379
380     @Test
381     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
382         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
383
384         MockRaftActorContext actorContext = createActorContextWithFollower();
385         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
386             @Override
387             public FiniteDuration getHeartBeatInterval() {
388                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
389             }
390         });
391
392         long term = 1;
393         actorContext.getTermInformation().update(term, "");
394
395         leader = new Leader(actorContext);
396
397         // Leader will send an immediate heartbeat - ignore it.
398         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
399
400         // The follower would normally reply - simulate that explicitly here.
401         long lastIndex = actorContext.getReplicatedLog().lastIndex();
402         leader.handleMessage(followerActor, new AppendEntriesReply(
403                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
404         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
405
406         followerActor.underlyingActor().clear();
407
408         sendReplicate(actorContext, lastIndex+1);
409
410         // Wait slightly longer than heartbeat duration
411         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
412
413         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
414
415         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
416         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
417
418         assertEquals(1, allMessages.get(0).getEntries().size());
419         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
420         assertEquals(1, allMessages.get(1).getEntries().size());
421         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
422
423     }
424
425     @Test
426     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
427         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
428
429         MockRaftActorContext actorContext = createActorContextWithFollower();
430         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
431             @Override
432             public FiniteDuration getHeartBeatInterval() {
433                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
434             }
435         });
436
437         long term = 1;
438         actorContext.getTermInformation().update(term, "");
439
440         leader = new Leader(actorContext);
441
442         // Leader will send an immediate heartbeat - ignore it.
443         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
444
445         // The follower would normally reply - simulate that explicitly here.
446         long lastIndex = actorContext.getReplicatedLog().lastIndex();
447         leader.handleMessage(followerActor, new AppendEntriesReply(
448                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
449         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
450
451         followerActor.underlyingActor().clear();
452
453         for(int i=0;i<3;i++) {
454             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
455             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
456         }
457
458         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
459         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
460     }
461
462     @Test
463     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
464         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
465
466         MockRaftActorContext actorContext = createActorContextWithFollower();
467         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
468             @Override
469             public FiniteDuration getHeartBeatInterval() {
470                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
471             }
472         });
473
474         long term = 1;
475         actorContext.getTermInformation().update(term, "");
476
477         leader = new Leader(actorContext);
478
479         // Leader will send an immediate heartbeat - ignore it.
480         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
481
482         // The follower would normally reply - simulate that explicitly here.
483         long lastIndex = actorContext.getReplicatedLog().lastIndex();
484         leader.handleMessage(followerActor, new AppendEntriesReply(
485                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
486         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
487
488         followerActor.underlyingActor().clear();
489
490         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
492         sendReplicate(actorContext, lastIndex+1);
493
494         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
496
497         assertEquals(0, allMessages.get(0).getEntries().size());
498         assertEquals(1, allMessages.get(1).getEntries().size());
499     }
500
501
502     @Test
503     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
504         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
505
506         MockRaftActorContext actorContext = createActorContext();
507
508         leader = new Leader(actorContext);
509
510         actorContext.setLastApplied(0);
511
512         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
513         long term = actorContext.getTermInformation().getCurrentTerm();
514         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
515                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
516
517         actorContext.getReplicatedLog().append(newEntry);
518
519         final Identifier id = new StringIdentifier("state-id");
520         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
521
522         // State should not change
523         assertTrue(raftBehavior instanceof Leader);
524
525         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
526
527         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
528         // one since lastApplied state is 0.
529         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
530                 leaderActor, ApplyState.class);
531         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
532
533         for(int i = 0; i <= newLogIndex - 1; i++ ) {
534             ApplyState applyState = applyStateList.get(i);
535             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
536             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
537         }
538
539         ApplyState last = applyStateList.get((int) newLogIndex - 1);
540         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
541         assertEquals("getIdentifier", id, last.getIdentifier());
542     }
543
544     @Test
545     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
546         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
547
548         MockRaftActorContext actorContext = createActorContextWithFollower();
549
550         Map<String, String> leadersSnapshot = new HashMap<>();
551         leadersSnapshot.put("1", "A");
552         leadersSnapshot.put("2", "B");
553         leadersSnapshot.put("3", "C");
554
555         //clears leaders log
556         actorContext.getReplicatedLog().removeFrom(0);
557
558         final int commitIndex = 3;
559         final int snapshotIndex = 2;
560         final int newEntryIndex = 4;
561         final int snapshotTerm = 1;
562         final int currentTerm = 2;
563
564         // set the snapshot variables in replicatedlog
565         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567         actorContext.setCommitIndex(commitIndex);
568         //set follower timeout to 2 mins, helps during debugging
569         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
570
571         leader = new Leader(actorContext);
572
573         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
574         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
575
576         // new entry
577         ReplicatedLogImplEntry entry =
578                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579                         new MockRaftActorContext.MockPayload("D"));
580
581         //update follower timestamp
582         leader.markFollowerActive(FOLLOWER_ID);
583
584         ByteString bs = toByteString(leadersSnapshot);
585         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
586                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
587         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
588         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
589
590         //send first chunk and no InstallSnapshotReply received yet
591         fts.getNextChunk();
592         fts.incrementChunkIndex();
593
594         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
595                 TimeUnit.MILLISECONDS);
596
597         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
598
599         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
600
601         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
602
603         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
604
605         //InstallSnapshotReply received
606         fts.markSendStatus(true);
607
608         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
609
610         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
611
612         assertEquals(commitIndex, is.getLastIncludedIndex());
613     }
614
615     @Test
616     public void testSendAppendEntriesSnapshotScenario() throws Exception {
617         logStart("testSendAppendEntriesSnapshotScenario");
618
619         MockRaftActorContext actorContext = createActorContextWithFollower();
620
621         Map<String, String> leadersSnapshot = new HashMap<>();
622         leadersSnapshot.put("1", "A");
623         leadersSnapshot.put("2", "B");
624         leadersSnapshot.put("3", "C");
625
626         //clears leaders log
627         actorContext.getReplicatedLog().removeFrom(0);
628
629         final int followersLastIndex = 2;
630         final int snapshotIndex = 3;
631         final int newEntryIndex = 4;
632         final int snapshotTerm = 1;
633         final int currentTerm = 2;
634
635         // set the snapshot variables in replicatedlog
636         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638         actorContext.setCommitIndex(followersLastIndex);
639
640         leader = new Leader(actorContext);
641
642         // Leader will send an immediate heartbeat - ignore it.
643         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
644
645         // new entry
646         ReplicatedLogImplEntry entry =
647                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
648                         new MockRaftActorContext.MockPayload("D"));
649
650         actorContext.getReplicatedLog().append(entry);
651
652         //update follower timestamp
653         leader.markFollowerActive(FOLLOWER_ID);
654
655         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
656         RaftActorBehavior raftBehavior = leader.handleMessage(
657                 leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
658
659         assertTrue(raftBehavior instanceof Leader);
660
661         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
662     }
663
664     @Test
665     public void testInitiateInstallSnapshot() throws Exception {
666         logStart("testInitiateInstallSnapshot");
667
668         MockRaftActorContext actorContext = createActorContextWithFollower();
669
670         //clears leaders log
671         actorContext.getReplicatedLog().removeFrom(0);
672
673         final int followersLastIndex = 2;
674         final int snapshotIndex = 3;
675         final int newEntryIndex = 4;
676         final int snapshotTerm = 1;
677         final int currentTerm = 2;
678
679         // set the snapshot variables in replicatedlog
680         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
681         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
682         actorContext.setLastApplied(3);
683         actorContext.setCommitIndex(followersLastIndex);
684
685         leader = new Leader(actorContext);
686
687         // Leader will send an immediate heartbeat - ignore it.
688         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689
690         // set the snapshot as absent and check if capture-snapshot is invoked.
691         leader.setSnapshot(null);
692
693         // new entry
694         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
695                 new MockRaftActorContext.MockPayload("D"));
696
697         actorContext.getReplicatedLog().append(entry);
698
699         //update follower timestamp
700         leader.markFollowerActive(FOLLOWER_ID);
701
702         leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
703
704         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
705
706         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
707
708         assertTrue(cs.isInstallSnapshotInitiated());
709         assertEquals(3, cs.getLastAppliedIndex());
710         assertEquals(1, cs.getLastAppliedTerm());
711         assertEquals(4, cs.getLastIndex());
712         assertEquals(2, cs.getLastTerm());
713
714         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
715         leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
716
717         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
718     }
719
720     @Test
721     public void testInitiateForceInstallSnapshot() throws Exception {
722         logStart("testInitiateForceInstallSnapshot");
723
724         MockRaftActorContext actorContext = createActorContextWithFollower();
725
726         final int followersLastIndex = 2;
727         final int snapshotIndex = -1;
728         final int newEntryIndex = 4;
729         final int snapshotTerm = -1;
730         final int currentTerm = 2;
731
732         // set the snapshot variables in replicatedlog
733         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
734         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
735         actorContext.setLastApplied(3);
736         actorContext.setCommitIndex(followersLastIndex);
737
738         actorContext.getReplicatedLog().removeFrom(0);
739
740         leader = new Leader(actorContext);
741
742         // Leader will send an immediate heartbeat - ignore it.
743         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
744
745         // set the snapshot as absent and check if capture-snapshot is invoked.
746         leader.setSnapshot(null);
747
748         for(int i=0;i<4;i++) {
749             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
750                     new MockRaftActorContext.MockPayload("X" + i)));
751         }
752
753         // new entry
754         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
755                 new MockRaftActorContext.MockPayload("D"));
756
757         actorContext.getReplicatedLog().append(entry);
758
759         //update follower timestamp
760         leader.markFollowerActive(FOLLOWER_ID);
761
762         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
763         // installed with a SendInstallSnapshot
764         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
765
766         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
767
768         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
769
770         assertTrue(cs.isInstallSnapshotInitiated());
771         assertEquals(3, cs.getLastAppliedIndex());
772         assertEquals(1, cs.getLastAppliedTerm());
773         assertEquals(4, cs.getLastIndex());
774         assertEquals(2, cs.getLastTerm());
775
776         // if an initiate is started again when first is in progress, it should not initiate Capture
777         leader.handleMessage(leaderActor, new Replicate(null, new StringIdentifier("state-id"), entry));
778
779         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
780     }
781
782
783     @Test
784     public void testInstallSnapshot() throws Exception {
785         logStart("testInstallSnapshot");
786
787         MockRaftActorContext actorContext = createActorContextWithFollower();
788
789         Map<String, String> leadersSnapshot = new HashMap<>();
790         leadersSnapshot.put("1", "A");
791         leadersSnapshot.put("2", "B");
792         leadersSnapshot.put("3", "C");
793
794         //clears leaders log
795         actorContext.getReplicatedLog().removeFrom(0);
796
797         final int lastAppliedIndex = 3;
798         final int snapshotIndex = 2;
799         final int snapshotTerm = 1;
800         final int currentTerm = 2;
801
802         // set the snapshot variables in replicatedlog
803         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806         actorContext.setCommitIndex(lastAppliedIndex);
807         actorContext.setLastApplied(lastAppliedIndex);
808
809         leader = new Leader(actorContext);
810
811         // Initial heartbeat.
812         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
813
814         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
815         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
816
817         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
818                 Collections.<ReplicatedLogEntry>emptyList(),
819                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
820
821         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
822
823         assertTrue(raftBehavior instanceof Leader);
824
825         // check if installsnapshot gets called with the correct values.
826
827         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
828
829         assertNotNull(installSnapshot.getData());
830         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
831         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
832
833         assertEquals(currentTerm, installSnapshot.getTerm());
834     }
835
836     @Test
837     public void testForceInstallSnapshot() throws Exception {
838         logStart("testForceInstallSnapshot");
839
840         MockRaftActorContext actorContext = createActorContextWithFollower();
841
842         Map<String, String> leadersSnapshot = new HashMap<>();
843         leadersSnapshot.put("1", "A");
844         leadersSnapshot.put("2", "B");
845         leadersSnapshot.put("3", "C");
846
847         final int lastAppliedIndex = 3;
848         final int snapshotIndex = -1;
849         final int snapshotTerm = -1;
850         final int currentTerm = 2;
851
852         // set the snapshot variables in replicatedlog
853         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856         actorContext.setCommitIndex(lastAppliedIndex);
857         actorContext.setLastApplied(lastAppliedIndex);
858
859         leader = new Leader(actorContext);
860
861         // Initial heartbeat.
862         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863
864         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
866
867         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
868                 Collections.<ReplicatedLogEntry>emptyList(),
869                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
870
871         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
872
873         assertTrue(raftBehavior instanceof Leader);
874
875         // check if installsnapshot gets called with the correct values.
876
877         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
878
879         assertNotNull(installSnapshot.getData());
880         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
881         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
882
883         assertEquals(currentTerm, installSnapshot.getTerm());
884     }
885
886     @Test
887     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
888         logStart("testHandleInstallSnapshotReplyLastChunk");
889
890         MockRaftActorContext actorContext = createActorContextWithFollower();
891
892         final int commitIndex = 3;
893         final int snapshotIndex = 2;
894         final int snapshotTerm = 1;
895         final int currentTerm = 2;
896
897         actorContext.setCommitIndex(commitIndex);
898
899         leader = new Leader(actorContext);
900         actorContext.setCurrentBehavior(leader);
901
902         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
903         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
904
905         // Ignore initial heartbeat.
906         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
907
908         Map<String, String> leadersSnapshot = new HashMap<>();
909         leadersSnapshot.put("1", "A");
910         leadersSnapshot.put("2", "B");
911         leadersSnapshot.put("3", "C");
912
913         // set the snapshot variables in replicatedlog
914
915         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
916         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
917         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
918
919         ByteString bs = toByteString(leadersSnapshot);
920         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
921                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
922         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
923         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
924         while(!fts.isLastChunk(fts.getChunkIndex())) {
925             fts.getNextChunk();
926             fts.incrementChunkIndex();
927         }
928
929         //clears leaders log
930         actorContext.getReplicatedLog().removeFrom(0);
931
932         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
933                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
934
935         assertTrue(raftBehavior instanceof Leader);
936
937         assertEquals(0, leader.followerSnapshotSize());
938         assertEquals(1, leader.followerLogSize());
939         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
940         assertNotNull(fli);
941         assertEquals(commitIndex, fli.getMatchIndex());
942         assertEquals(commitIndex + 1, fli.getNextIndex());
943     }
944
945     @Test
946     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
947         logStart("testSendSnapshotfromInstallSnapshotReply");
948
949         MockRaftActorContext actorContext = createActorContextWithFollower();
950
951         final int commitIndex = 3;
952         final int snapshotIndex = 2;
953         final int snapshotTerm = 1;
954         final int currentTerm = 2;
955
956         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
957             @Override
958             public int getSnapshotChunkSize() {
959                 return 50;
960             }
961         };
962         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
963         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
964
965         actorContext.setConfigParams(configParams);
966         actorContext.setCommitIndex(commitIndex);
967
968         leader = new Leader(actorContext);
969         actorContext.setCurrentBehavior(leader);
970
971         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
972         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
973
974         Map<String, String> leadersSnapshot = new HashMap<>();
975         leadersSnapshot.put("1", "A");
976         leadersSnapshot.put("2", "B");
977         leadersSnapshot.put("3", "C");
978
979         // set the snapshot variables in replicatedlog
980         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
981         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
982         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
983
984         ByteString bs = toByteString(leadersSnapshot);
985         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
986                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
987         leader.setSnapshot(snapshot);
988
989         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
990
991         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
992
993         assertEquals(1, installSnapshot.getChunkIndex());
994         assertEquals(3, installSnapshot.getTotalChunks());
995
996         followerActor.underlyingActor().clear();
997         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
998                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
999
1000         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1001
1002         assertEquals(2, installSnapshot.getChunkIndex());
1003         assertEquals(3, installSnapshot.getTotalChunks());
1004
1005         followerActor.underlyingActor().clear();
1006         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1007                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1008
1009         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1010
1011         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1012         followerActor.underlyingActor().clear();
1013         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1014                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1015
1016         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1017
1018         assertNull(installSnapshot);
1019     }
1020
1021
1022     @Test
1023     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1024         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1025
1026         MockRaftActorContext actorContext = createActorContextWithFollower();
1027
1028         final int commitIndex = 3;
1029         final int snapshotIndex = 2;
1030         final int snapshotTerm = 1;
1031         final int currentTerm = 2;
1032
1033         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1034             @Override
1035             public int getSnapshotChunkSize() {
1036                 return 50;
1037             }
1038         });
1039
1040         actorContext.setCommitIndex(commitIndex);
1041
1042         leader = new Leader(actorContext);
1043
1044         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1045         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1046
1047         Map<String, String> leadersSnapshot = new HashMap<>();
1048         leadersSnapshot.put("1", "A");
1049         leadersSnapshot.put("2", "B");
1050         leadersSnapshot.put("3", "C");
1051
1052         // set the snapshot variables in replicatedlog
1053         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1054         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1055         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1056
1057         ByteString bs = toByteString(leadersSnapshot);
1058         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1059                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1060         leader.setSnapshot(snapshot);
1061
1062         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1063         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1064
1065         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1066
1067         assertEquals(1, installSnapshot.getChunkIndex());
1068         assertEquals(3, installSnapshot.getTotalChunks());
1069
1070         followerActor.underlyingActor().clear();
1071
1072         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1073                 FOLLOWER_ID, -1, false));
1074
1075         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076                 TimeUnit.MILLISECONDS);
1077
1078         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1079
1080         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1081
1082         assertEquals(1, installSnapshot.getChunkIndex());
1083         assertEquals(3, installSnapshot.getTotalChunks());
1084     }
1085
1086     @Test
1087     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1088         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1089
1090         MockRaftActorContext actorContext = createActorContextWithFollower();
1091
1092         final int commitIndex = 3;
1093         final int snapshotIndex = 2;
1094         final int snapshotTerm = 1;
1095         final int currentTerm = 2;
1096
1097         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1098             @Override
1099             public int getSnapshotChunkSize() {
1100                 return 50;
1101             }
1102         });
1103
1104         actorContext.setCommitIndex(commitIndex);
1105
1106         leader = new Leader(actorContext);
1107
1108         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1109         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1110
1111         Map<String, String> leadersSnapshot = new HashMap<>();
1112         leadersSnapshot.put("1", "A");
1113         leadersSnapshot.put("2", "B");
1114         leadersSnapshot.put("3", "C");
1115
1116         // set the snapshot variables in replicatedlog
1117         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1118         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1119         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1120
1121         ByteString bs = toByteString(leadersSnapshot);
1122         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1123                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1124         leader.setSnapshot(snapshot);
1125
1126         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1127
1128         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1129
1130         assertEquals(1, installSnapshot.getChunkIndex());
1131         assertEquals(3, installSnapshot.getTotalChunks());
1132         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1133
1134         int hashCode = Arrays.hashCode(installSnapshot.getData());
1135
1136         followerActor.underlyingActor().clear();
1137
1138         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1139                 FOLLOWER_ID, 1, true));
1140
1141         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1142
1143         assertEquals(2, installSnapshot.getChunkIndex());
1144         assertEquals(3, installSnapshot.getTotalChunks());
1145         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1146     }
1147
1148     @Test
1149     public void testFollowerToSnapshotLogic() {
1150         logStart("testFollowerToSnapshotLogic");
1151
1152         MockRaftActorContext actorContext = createActorContext();
1153
1154         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1155             @Override
1156             public int getSnapshotChunkSize() {
1157                 return 50;
1158             }
1159         });
1160
1161         leader = new Leader(actorContext);
1162
1163         Map<String, String> leadersSnapshot = new HashMap<>();
1164         leadersSnapshot.put("1", "A");
1165         leadersSnapshot.put("2", "B");
1166         leadersSnapshot.put("3", "C");
1167
1168         ByteString bs = toByteString(leadersSnapshot);
1169         byte[] barray = bs.toByteArray();
1170
1171         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1172         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1173
1174         assertEquals(bs.size(), barray.length);
1175
1176         int chunkIndex=0;
1177         for (int i=0; i < barray.length; i = i + 50) {
1178             int j = i + 50;
1179             chunkIndex++;
1180
1181             if (i + 50 > barray.length) {
1182                 j = barray.length;
1183             }
1184
1185             byte[] chunk = fts.getNextChunk();
1186             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1187             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1188
1189             fts.markSendStatus(true);
1190             if (!fts.isLastChunk(chunkIndex)) {
1191                 fts.incrementChunkIndex();
1192             }
1193         }
1194
1195         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1196     }
1197
1198     @Override
1199     protected Leader createBehavior(final RaftActorContext actorContext) {
1200         return new Leader(actorContext);
1201     }
1202
1203     @Override
1204     protected MockRaftActorContext createActorContext() {
1205         return createActorContext(leaderActor);
1206     }
1207
1208     @Override
1209     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1210         return createActorContext(LEADER_ID, actorRef);
1211     }
1212
1213     private MockRaftActorContext createActorContextWithFollower() {
1214         MockRaftActorContext actorContext = createActorContext();
1215         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1216                 followerActor.path().toString()).build());
1217         return actorContext;
1218     }
1219
1220     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1221         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1222         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1223         configParams.setElectionTimeoutFactor(100000);
1224         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1225         context.setConfigParams(configParams);
1226         context.setPayloadVersion(payloadVersion);
1227         return context;
1228     }
1229
1230     private MockRaftActorContext createFollowerActorContextWithLeader() {
1231         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1232         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1233         followerConfig.setElectionTimeoutFactor(10000);
1234         followerActorContext.setConfigParams(followerConfig);
1235         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1236         return followerActorContext;
1237     }
1238
1239     @Test
1240     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1241         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1242
1243         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1244
1245         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1246
1247         Follower follower = new Follower(followerActorContext);
1248         followerActor.underlyingActor().setBehavior(follower);
1249         followerActorContext.setCurrentBehavior(follower);
1250
1251         Map<String, String> peerAddresses = new HashMap<>();
1252         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1253
1254         leaderActorContext.setPeerAddresses(peerAddresses);
1255
1256         leaderActorContext.getReplicatedLog().removeFrom(0);
1257
1258         //create 3 entries
1259         leaderActorContext.setReplicatedLog(
1260                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1261
1262         leaderActorContext.setCommitIndex(1);
1263
1264         followerActorContext.getReplicatedLog().removeFrom(0);
1265
1266         // follower too has the exact same log entries and has the same commit index
1267         followerActorContext.setReplicatedLog(
1268                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1269
1270         followerActorContext.setCommitIndex(1);
1271
1272         leader = new Leader(leaderActorContext);
1273         leaderActorContext.setCurrentBehavior(leader);
1274
1275         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1276
1277         assertEquals(1, appendEntries.getLeaderCommit());
1278         assertEquals(0, appendEntries.getEntries().size());
1279         assertEquals(0, appendEntries.getPrevLogIndex());
1280
1281         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1282                 leaderActor, AppendEntriesReply.class);
1283
1284         assertEquals(2, appendEntriesReply.getLogLastIndex());
1285         assertEquals(1, appendEntriesReply.getLogLastTerm());
1286
1287         // follower returns its next index
1288         assertEquals(2, appendEntriesReply.getLogLastIndex());
1289         assertEquals(1, appendEntriesReply.getLogLastTerm());
1290
1291         follower.close();
1292     }
1293
1294     @Test
1295     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1296         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1297
1298         MockRaftActorContext leaderActorContext = createActorContext();
1299
1300         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1301         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1302
1303         Follower follower = new Follower(followerActorContext);
1304         followerActor.underlyingActor().setBehavior(follower);
1305         followerActorContext.setCurrentBehavior(follower);
1306
1307         Map<String, String> leaderPeerAddresses = new HashMap<>();
1308         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1309
1310         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1311
1312         leaderActorContext.getReplicatedLog().removeFrom(0);
1313
1314         leaderActorContext.setReplicatedLog(
1315                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1316
1317         leaderActorContext.setCommitIndex(1);
1318
1319         followerActorContext.getReplicatedLog().removeFrom(0);
1320
1321         followerActorContext.setReplicatedLog(
1322                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1323
1324         // follower has the same log entries but its commit index > leaders commit index
1325         followerActorContext.setCommitIndex(2);
1326
1327         leader = new Leader(leaderActorContext);
1328
1329         // Initial heartbeat
1330         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1331
1332         assertEquals(1, appendEntries.getLeaderCommit());
1333         assertEquals(0, appendEntries.getEntries().size());
1334         assertEquals(0, appendEntries.getPrevLogIndex());
1335
1336         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1337                 leaderActor, AppendEntriesReply.class);
1338
1339         assertEquals(2, appendEntriesReply.getLogLastIndex());
1340         assertEquals(1, appendEntriesReply.getLogLastTerm());
1341
1342         leaderActor.underlyingActor().setBehavior(follower);
1343         leader.handleMessage(followerActor, appendEntriesReply);
1344
1345         leaderActor.underlyingActor().clear();
1346         followerActor.underlyingActor().clear();
1347
1348         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1349                 TimeUnit.MILLISECONDS);
1350
1351         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1352
1353         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1354
1355         assertEquals(2, appendEntries.getLeaderCommit());
1356         assertEquals(0, appendEntries.getEntries().size());
1357         assertEquals(2, appendEntries.getPrevLogIndex());
1358
1359         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1360
1361         assertEquals(2, appendEntriesReply.getLogLastIndex());
1362         assertEquals(1, appendEntriesReply.getLogLastTerm());
1363
1364         assertEquals(2, followerActorContext.getCommitIndex());
1365
1366         follower.close();
1367     }
1368
1369     @Test
1370     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1371         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1372
1373         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1374         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1375                 new FiniteDuration(1000, TimeUnit.SECONDS));
1376
1377         leaderActorContext.setReplicatedLog(
1378                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1379         long leaderCommitIndex = 2;
1380         leaderActorContext.setCommitIndex(leaderCommitIndex);
1381         leaderActorContext.setLastApplied(leaderCommitIndex);
1382
1383         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1384         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1385
1386         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1387
1388         followerActorContext.setReplicatedLog(
1389                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1390         followerActorContext.setCommitIndex(0);
1391         followerActorContext.setLastApplied(0);
1392
1393         Follower follower = new Follower(followerActorContext);
1394         followerActor.underlyingActor().setBehavior(follower);
1395
1396         leader = new Leader(leaderActorContext);
1397
1398         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1399         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1400
1401         MessageCollectorActor.clearMessages(followerActor);
1402         MessageCollectorActor.clearMessages(leaderActor);
1403
1404         // Verify initial AppendEntries sent with the leader's current commit index.
1405         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1406         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1407         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1408
1409         leaderActor.underlyingActor().setBehavior(leader);
1410
1411         leader.handleMessage(followerActor, appendEntriesReply);
1412
1413         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1414         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1415
1416         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1417         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1418         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1419
1420         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1421         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1422                 appendEntries.getEntries().get(0).getData());
1423         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1424         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1425                 appendEntries.getEntries().get(1).getData());
1426
1427         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1428         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1429
1430         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1431
1432         ApplyState applyState = applyStateList.get(0);
1433         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1434         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1435         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1436                 applyState.getReplicatedLogEntry().getData());
1437
1438         applyState = applyStateList.get(1);
1439         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1440         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1441         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1442                 applyState.getReplicatedLogEntry().getData());
1443
1444         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1445         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1446     }
1447
1448     @Test
1449     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1450         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1451
1452         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1453         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1454                 new FiniteDuration(1000, TimeUnit.SECONDS));
1455
1456         leaderActorContext.setReplicatedLog(
1457                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1458         long leaderCommitIndex = 1;
1459         leaderActorContext.setCommitIndex(leaderCommitIndex);
1460         leaderActorContext.setLastApplied(leaderCommitIndex);
1461
1462         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1463         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1464
1465         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1466
1467         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1468         followerActorContext.setCommitIndex(-1);
1469         followerActorContext.setLastApplied(-1);
1470
1471         Follower follower = new Follower(followerActorContext);
1472         followerActor.underlyingActor().setBehavior(follower);
1473         followerActorContext.setCurrentBehavior(follower);
1474
1475         leader = new Leader(leaderActorContext);
1476
1477         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1478         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1479
1480         MessageCollectorActor.clearMessages(followerActor);
1481         MessageCollectorActor.clearMessages(leaderActor);
1482
1483         // Verify initial AppendEntries sent with the leader's current commit index.
1484         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1485         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1486         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1487
1488         leaderActor.underlyingActor().setBehavior(leader);
1489         leaderActorContext.setCurrentBehavior(leader);
1490
1491         leader.handleMessage(followerActor, appendEntriesReply);
1492
1493         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1494         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1495
1496         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1497         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1498         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1499
1500         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1501         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1502                 appendEntries.getEntries().get(0).getData());
1503         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1504         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1505                 appendEntries.getEntries().get(1).getData());
1506
1507         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1508         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1509
1510         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1511
1512         ApplyState applyState = applyStateList.get(0);
1513         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1514         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1515         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1516                 applyState.getReplicatedLogEntry().getData());
1517
1518         applyState = applyStateList.get(1);
1519         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1520         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1521         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1522                 applyState.getReplicatedLogEntry().getData());
1523
1524         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1525         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1526     }
1527
1528     @Test
1529     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1530         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1531
1532         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1533         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1534                 new FiniteDuration(1000, TimeUnit.SECONDS));
1535
1536         leaderActorContext.setReplicatedLog(
1537                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1538         long leaderCommitIndex = 1;
1539         leaderActorContext.setCommitIndex(leaderCommitIndex);
1540         leaderActorContext.setLastApplied(leaderCommitIndex);
1541
1542         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1543         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1544
1545         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1546
1547         followerActorContext.setReplicatedLog(
1548                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1549         followerActorContext.setCommitIndex(-1);
1550         followerActorContext.setLastApplied(-1);
1551
1552         Follower follower = new Follower(followerActorContext);
1553         followerActor.underlyingActor().setBehavior(follower);
1554         followerActorContext.setCurrentBehavior(follower);
1555
1556         leader = new Leader(leaderActorContext);
1557
1558         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1559         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1560
1561         MessageCollectorActor.clearMessages(followerActor);
1562         MessageCollectorActor.clearMessages(leaderActor);
1563
1564         // Verify initial AppendEntries sent with the leader's current commit index.
1565         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1566         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1567         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1568
1569         leaderActor.underlyingActor().setBehavior(leader);
1570         leaderActorContext.setCurrentBehavior(leader);
1571
1572         leader.handleMessage(followerActor, appendEntriesReply);
1573
1574         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1575         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1576
1577         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1578         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1579         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1580
1581         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1582         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1583         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1584                 appendEntries.getEntries().get(0).getData());
1585         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1586         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1587         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1588                 appendEntries.getEntries().get(1).getData());
1589
1590         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1591         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1592
1593         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1594
1595         ApplyState applyState = applyStateList.get(0);
1596         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1597         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1598         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1599                 applyState.getReplicatedLogEntry().getData());
1600
1601         applyState = applyStateList.get(1);
1602         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1603         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1604         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1605                 applyState.getReplicatedLogEntry().getData());
1606
1607         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1608         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1609         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1610     }
1611
1612     @Test
1613     public void testHandleAppendEntriesReplyWithNewerTerm(){
1614         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1615
1616         MockRaftActorContext leaderActorContext = createActorContext();
1617         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1618                 new FiniteDuration(10000, TimeUnit.SECONDS));
1619
1620         leaderActorContext.setReplicatedLog(
1621                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1622
1623         leader = new Leader(leaderActorContext);
1624         leaderActor.underlyingActor().setBehavior(leader);
1625         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1626
1627         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1628
1629         assertEquals(false, appendEntriesReply.isSuccess());
1630         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1631
1632         MessageCollectorActor.clearMessages(leaderActor);
1633     }
1634
1635     @Test
1636     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1637         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1638
1639         MockRaftActorContext leaderActorContext = createActorContext();
1640         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1641                 new FiniteDuration(10000, TimeUnit.SECONDS));
1642
1643         leaderActorContext.setReplicatedLog(
1644                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1645         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1646
1647         leader = new Leader(leaderActorContext);
1648         leaderActor.underlyingActor().setBehavior(leader);
1649         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1650
1651         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1652
1653         assertEquals(false, appendEntriesReply.isSuccess());
1654         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1655
1656         MessageCollectorActor.clearMessages(leaderActor);
1657     }
1658
1659     @Test
1660     public void testHandleAppendEntriesReplySuccess() throws Exception {
1661         logStart("testHandleAppendEntriesReplySuccess");
1662
1663         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1664
1665         leaderActorContext.setReplicatedLog(
1666                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1667
1668         leaderActorContext.setCommitIndex(1);
1669         leaderActorContext.setLastApplied(1);
1670         leaderActorContext.getTermInformation().update(1, "leader");
1671
1672         leader = new Leader(leaderActorContext);
1673
1674         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1675
1676         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1677         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1678
1679         short payloadVersion = 5;
1680         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1681
1682         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1683
1684         assertEquals(RaftState.Leader, raftActorBehavior.state());
1685
1686         assertEquals(2, leaderActorContext.getCommitIndex());
1687
1688         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1689                 leaderActor, ApplyJournalEntries.class);
1690
1691         assertEquals(2, leaderActorContext.getLastApplied());
1692
1693         assertEquals(2, applyJournalEntries.getToIndex());
1694
1695         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1696                 ApplyState.class);
1697
1698         assertEquals(1,applyStateList.size());
1699
1700         ApplyState applyState = applyStateList.get(0);
1701
1702         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1703
1704         assertEquals(2, followerInfo.getMatchIndex());
1705         assertEquals(3, followerInfo.getNextIndex());
1706         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1707         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1708     }
1709
1710     @Test
1711     public void testHandleAppendEntriesReplyUnknownFollower(){
1712         logStart("testHandleAppendEntriesReplyUnknownFollower");
1713
1714         MockRaftActorContext leaderActorContext = createActorContext();
1715
1716         leader = new Leader(leaderActorContext);
1717
1718         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1719
1720         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1721
1722         assertEquals(RaftState.Leader, raftActorBehavior.state());
1723     }
1724
1725     @Test
1726     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1727         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1728
1729         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1730         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1731                 new FiniteDuration(1000, TimeUnit.SECONDS));
1732         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1733
1734         leaderActorContext.setReplicatedLog(
1735                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1736         long leaderCommitIndex = 3;
1737         leaderActorContext.setCommitIndex(leaderCommitIndex);
1738         leaderActorContext.setLastApplied(leaderCommitIndex);
1739
1740         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1741         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1742         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1743         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1744
1745         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1746
1747         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1748         followerActorContext.setCommitIndex(-1);
1749         followerActorContext.setLastApplied(-1);
1750
1751         Follower follower = new Follower(followerActorContext);
1752         followerActor.underlyingActor().setBehavior(follower);
1753         followerActorContext.setCurrentBehavior(follower);
1754
1755         leader = new Leader(leaderActorContext);
1756
1757         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1758         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1759
1760         MessageCollectorActor.clearMessages(followerActor);
1761         MessageCollectorActor.clearMessages(leaderActor);
1762
1763         // Verify initial AppendEntries sent with the leader's current commit index.
1764         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1765         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1766         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1767
1768         leaderActor.underlyingActor().setBehavior(leader);
1769         leaderActorContext.setCurrentBehavior(leader);
1770
1771         leader.handleMessage(followerActor, appendEntriesReply);
1772
1773         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1774         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1775
1776         appendEntries = appendEntriesList.get(0);
1777         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1778         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1779         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1780
1781         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1782         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1783                 appendEntries.getEntries().get(0).getData());
1784         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1785         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1786                 appendEntries.getEntries().get(1).getData());
1787
1788         appendEntries = appendEntriesList.get(1);
1789         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1790         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1791         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1792
1793         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1794         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1795                 appendEntries.getEntries().get(0).getData());
1796         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1797         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1798                 appendEntries.getEntries().get(1).getData());
1799
1800         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1801         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1802
1803         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1804
1805         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1806         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1807     }
1808
1809     @Test
1810     public void testHandleRequestVoteReply(){
1811         logStart("testHandleRequestVoteReply");
1812
1813         MockRaftActorContext leaderActorContext = createActorContext();
1814
1815         leader = new Leader(leaderActorContext);
1816
1817         // Should be a no-op.
1818         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1819                 new RequestVoteReply(1, true));
1820
1821         assertEquals(RaftState.Leader, raftActorBehavior.state());
1822
1823         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1824
1825         assertEquals(RaftState.Leader, raftActorBehavior.state());
1826     }
1827
1828     @Test
1829     public void testIsolatedLeaderCheckNoFollowers() {
1830         logStart("testIsolatedLeaderCheckNoFollowers");
1831
1832         MockRaftActorContext leaderActorContext = createActorContext();
1833
1834         leader = new Leader(leaderActorContext);
1835         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1836         assertTrue(behavior instanceof Leader);
1837     }
1838
1839     @Test
1840     public void testIsolatedLeaderCheckNoVotingFollowers() {
1841         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1842
1843         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1844         Follower follower = new Follower(followerActorContext);
1845         followerActor.underlyingActor().setBehavior(follower);
1846
1847         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1848         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1849                 new FiniteDuration(1000, TimeUnit.SECONDS));
1850         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1851
1852         leader = new Leader(leaderActorContext);
1853         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1854         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1855         assertTrue("Expected Leader", behavior instanceof Leader);
1856     }
1857
1858     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1859         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1860         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1861
1862         MockRaftActorContext leaderActorContext = createActorContext();
1863
1864         Map<String, String> peerAddresses = new HashMap<>();
1865         peerAddresses.put("follower-1", followerActor1.path().toString());
1866         peerAddresses.put("follower-2", followerActor2.path().toString());
1867
1868         leaderActorContext.setPeerAddresses(peerAddresses);
1869         leaderActorContext.setRaftPolicy(raftPolicy);
1870
1871         leader = new Leader(leaderActorContext);
1872
1873         leader.markFollowerActive("follower-1");
1874         leader.markFollowerActive("follower-2");
1875         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1876         assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1877
1878         // kill 1 follower and verify if that got killed
1879         final JavaTestKit probe = new JavaTestKit(getSystem());
1880         probe.watch(followerActor1);
1881         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1882         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1883         assertEquals(termMsg1.getActor(), followerActor1);
1884
1885         leader.markFollowerInActive("follower-1");
1886         leader.markFollowerActive("follower-2");
1887         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1888         assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1889
1890         // kill 2nd follower and leader should change to Isolated leader
1891         followerActor2.tell(PoisonPill.getInstance(), null);
1892         probe.watch(followerActor2);
1893         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1894         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1895         assertEquals(termMsg2.getActor(), followerActor2);
1896
1897         leader.markFollowerInActive("follower-2");
1898         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1899     }
1900
1901     @Test
1902     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1903         logStart("testIsolatedLeaderCheckTwoFollowers");
1904
1905         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1906
1907         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1908             behavior instanceof IsolatedLeader);
1909     }
1910
1911     @Test
1912     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1913         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1914
1915         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1916
1917         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1918                 behavior instanceof Leader);
1919     }
1920
1921     @Test
1922     public void testLaggingFollowerStarvation() throws Exception {
1923         logStart("testLaggingFollowerStarvation");
1924         new JavaTestKit(getSystem()) {{
1925             String leaderActorId = actorFactory.generateActorId("leader");
1926             String follower1ActorId = actorFactory.generateActorId("follower");
1927             String follower2ActorId = actorFactory.generateActorId("follower");
1928
1929             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1930                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1931             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1932             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1933
1934             MockRaftActorContext leaderActorContext =
1935                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1936
1937             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1938             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1939             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1940
1941             leaderActorContext.setConfigParams(configParams);
1942
1943             leaderActorContext.setReplicatedLog(
1944                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1945
1946             Map<String, String> peerAddresses = new HashMap<>();
1947             peerAddresses.put(follower1ActorId,
1948                     follower1Actor.path().toString());
1949             peerAddresses.put(follower2ActorId,
1950                     follower2Actor.path().toString());
1951
1952             leaderActorContext.setPeerAddresses(peerAddresses);
1953             leaderActorContext.getTermInformation().update(1, leaderActorId);
1954
1955             RaftActorBehavior leader = createBehavior(leaderActorContext);
1956
1957             leaderActor.underlyingActor().setBehavior(leader);
1958
1959             for(int i=1;i<6;i++) {
1960                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1961                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1962                 assertTrue(newBehavior == leader);
1963                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1964             }
1965
1966             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1967             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1968
1969             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1970                     heartbeats.size() > 1);
1971
1972             // Check if follower-2 got AppendEntries during this time and was not starved
1973             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1974
1975             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1976                     appendEntries.size() > 1);
1977
1978         }};
1979     }
1980
1981     @Test
1982     public void testReplicationConsensusWithNonVotingFollower() {
1983         logStart("testReplicationConsensusWithNonVotingFollower");
1984
1985         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1986         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1987                 new FiniteDuration(1000, TimeUnit.SECONDS));
1988
1989         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1990
1991         String nonVotingFollowerId = "nonvoting-follower";
1992         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1993                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1994
1995         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1996
1997         leader = new Leader(leaderActorContext);
1998         leaderActorContext.setCurrentBehavior(leader);
1999
2000         // Ignore initial heartbeats
2001         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2002         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2003
2004         MessageCollectorActor.clearMessages(followerActor);
2005         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2006         MessageCollectorActor.clearMessages(leaderActor);
2007
2008         // Send a Replicate message and wait for AppendEntries.
2009         sendReplicate(leaderActorContext, 0);
2010
2011         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2013
2014         // Send reply only from the voting follower and verify consensus via ApplyState.
2015         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2016
2017         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2018
2019         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2020
2021         MessageCollectorActor.clearMessages(followerActor);
2022         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2023         MessageCollectorActor.clearMessages(leaderActor);
2024
2025         // Send another Replicate message
2026         sendReplicate(leaderActorContext, 1);
2027
2028         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2029         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2030                 AppendEntries.class);
2031         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2032         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2033
2034         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2035         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2036
2037         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2038
2039         // Send reply from the voting follower and verify consensus.
2040         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2041
2042         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2043     }
2044
2045     @Test
2046     public void testTransferLeadershipWithFollowerInSync() {
2047         logStart("testTransferLeadershipWithFollowerInSync");
2048
2049         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2051                 new FiniteDuration(1000, TimeUnit.SECONDS));
2052         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2053
2054         leader = new Leader(leaderActorContext);
2055         leaderActorContext.setCurrentBehavior(leader);
2056
2057         // Initial heartbeat
2058         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2060         MessageCollectorActor.clearMessages(followerActor);
2061
2062         sendReplicate(leaderActorContext, 0);
2063         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2064
2065         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2066         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2067         MessageCollectorActor.clearMessages(followerActor);
2068
2069         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2070         leader.transferLeadership(mockTransferCohort);
2071
2072         verify(mockTransferCohort, never()).transferComplete();
2073         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2075
2076         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2077         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2078
2079         // Leader should force an election timeout
2080         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2081
2082         verify(mockTransferCohort).transferComplete();
2083     }
2084
2085     @Test
2086     public void testTransferLeadershipWithEmptyLog() {
2087         logStart("testTransferLeadershipWithEmptyLog");
2088
2089         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2090         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2091                 new FiniteDuration(1000, TimeUnit.SECONDS));
2092         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2093
2094         leader = new Leader(leaderActorContext);
2095         leaderActorContext.setCurrentBehavior(leader);
2096
2097         // Initial heartbeat
2098         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100         MessageCollectorActor.clearMessages(followerActor);
2101
2102         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2103         leader.transferLeadership(mockTransferCohort);
2104
2105         verify(mockTransferCohort, never()).transferComplete();
2106         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2107         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2108
2109         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2110         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2111
2112         // Leader should force an election timeout
2113         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2114
2115         verify(mockTransferCohort).transferComplete();
2116     }
2117
2118     @Test
2119     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2120         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2121
2122         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2123         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2124                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2125
2126         leader = new Leader(leaderActorContext);
2127         leaderActorContext.setCurrentBehavior(leader);
2128
2129         // Initial heartbeat
2130         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2131         MessageCollectorActor.clearMessages(followerActor);
2132
2133         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2134         leader.transferLeadership(mockTransferCohort);
2135
2136         verify(mockTransferCohort, never()).transferComplete();
2137
2138         // Sync up the follower.
2139         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2141         MessageCollectorActor.clearMessages(followerActor);
2142
2143         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2144                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2145         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2146         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2148
2149         // Leader should force an election timeout
2150         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2151
2152         verify(mockTransferCohort).transferComplete();
2153     }
2154
2155     @Test
2156     public void testTransferLeadershipWithFollowerSyncTimeout() {
2157         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2158
2159         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2162         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2163         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2164
2165         leader = new Leader(leaderActorContext);
2166         leaderActorContext.setCurrentBehavior(leader);
2167
2168         // Initial heartbeat
2169         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2170         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2171         MessageCollectorActor.clearMessages(followerActor);
2172
2173         sendReplicate(leaderActorContext, 0);
2174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2175
2176         MessageCollectorActor.clearMessages(followerActor);
2177
2178         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2179         leader.transferLeadership(mockTransferCohort);
2180
2181         verify(mockTransferCohort, never()).transferComplete();
2182
2183         // Send heartbeats to time out the transfer.
2184         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2185             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2186                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2187             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2188         }
2189
2190         verify(mockTransferCohort).abortTransfer();
2191         verify(mockTransferCohort, never()).transferComplete();
2192         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2193     }
2194
2195     @Override
2196     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2197             ActorRef actorRef, RaftRPC rpc) throws Exception {
2198         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2199         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2200     }
2201
2202     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2203
2204         private final long electionTimeOutIntervalMillis;
2205         private final int snapshotChunkSize;
2206
2207         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2208             super();
2209             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2210             this.snapshotChunkSize = snapshotChunkSize;
2211         }
2212
2213         @Override
2214         public FiniteDuration getElectionTimeOutInterval() {
2215             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2216         }
2217
2218         @Override
2219         public int getSnapshotChunkSize() {
2220             return snapshotChunkSize;
2221         }
2222     }
2223 }