Bug 6540: Move LeaderInstallSnapshotState to FollowerLogInformation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.google.protobuf.ByteString;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
39 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
42 import org.opendaylight.controller.cluster.raft.RaftState;
43 import org.opendaylight.controller.cluster.raft.RaftVersions;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class LeaderTest extends AbstractLeaderTest<Leader> {
70
71     static final String FOLLOWER_ID = "follower";
72     public static final String LEADER_ID = "leader";
73
74     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
75             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
76
77     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
79
80     private Leader leader;
81     private final short payloadVersion = 5;
82
83     @Override
84     @After
85     public void tearDown() throws Exception {
86         if(leader != null) {
87             leader.close();
88         }
89
90         super.tearDown();
91     }
92
93     @Test
94     public void testHandleMessageForUnknownMessage() throws Exception {
95         logStart("testHandleMessageForUnknownMessage");
96
97         leader = new Leader(createActorContext());
98
99         // handle message should null when it receives an unknown message
100         assertNull(leader.handleMessage(followerActor, "foo"));
101     }
102
103     @Test
104     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106
107         MockRaftActorContext actorContext = createActorContextWithFollower();
108         actorContext.setCommitIndex(-1);
109         short payloadVersion = (short)5;
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116         actorContext.setCurrentBehavior(leader);
117
118         // Leader should send an immediate heartbeat with no entries as follower is inactive.
119         long lastIndex = actorContext.getReplicatedLog().lastIndex();
120         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getTerm", term, appendEntries.getTerm());
122         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 0, appendEntries.getEntries().size());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126
127         // The follower would normally reply - simulate that explicitly here.
128         leader.handleMessage(followerActor, new AppendEntriesReply(
129                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131
132         followerActor.underlyingActor().clear();
133
134         // Sleep for the heartbeat interval so AppendEntries is sent.
135         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137
138         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139
140         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143         assertEquals("Entries size", 1, appendEntries.getEntries().size());
144         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147     }
148
149
150     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151         return sendReplicate(actorContext, 1, index);
152     }
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157                 term, index, payload);
158         actorContext.getReplicatedLog().append(newEntry);
159         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
160     }
161
162     @Test
163     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165
166         MockRaftActorContext actorContext = createActorContextWithFollower();
167
168         long term = 1;
169         actorContext.getTermInformation().update(term, "");
170
171         leader = new Leader(actorContext);
172
173         // Leader will send an immediate heartbeat - ignore it.
174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175
176         // The follower would normally reply - simulate that explicitly here.
177         long lastIndex = actorContext.getReplicatedLog().lastIndex();
178         leader.handleMessage(followerActor, new AppendEntriesReply(
179                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181
182         followerActor.underlyingActor().clear();
183
184         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185
186         // State should not change
187         assertTrue(raftBehavior instanceof Leader);
188
189         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192         assertEquals("Entries size", 1, appendEntries.getEntries().size());
193         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
197     }
198
199     @Test
200     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202
203         MockRaftActorContext actorContext = createActorContextWithFollower();
204         actorContext.setCommitIndex(-1);
205
206         // The raft context is initialized with a couple log entries. However the commitIndex
207         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
208         // committed and applied. Now it regains leadership with a higher term (2).
209         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
210         long newTerm = prevTerm + 1;
211         actorContext.getTermInformation().update(newTerm, "");
212
213         leader = new Leader(actorContext);
214         actorContext.setCurrentBehavior(leader);
215
216         // Leader will send an immediate heartbeat - ignore it.
217         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
218
219         // The follower replies with the leader's current last index and term, simulating that it is
220         // up to date with the leader.
221         long lastIndex = actorContext.getReplicatedLog().lastIndex();
222         leader.handleMessage(followerActor, new AppendEntriesReply(
223                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
224
225         // The commit index should not get updated even though consensus was reached. This is b/c the
226         // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
227         // from previous terms by counting replicas".
228         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
229
230         followerActor.underlyingActor().clear();
231
232         // Now replicate a new entry with the new term 2.
233         long newIndex = lastIndex + 1;
234         sendReplicate(actorContext, newTerm, newIndex);
235
236         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
238         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
239         assertEquals("Entries size", 1, appendEntries.getEntries().size());
240         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
241         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
242         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
243
244         // The follower replies with success. The leader should now update the commit index to the new index
245         // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
246         // prior entries are committed indirectly".
247         leader.handleMessage(followerActor, new AppendEntriesReply(
248                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
249
250         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
251     }
252
253     @Test
254     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
255         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
256
257         MockRaftActorContext actorContext = createActorContextWithFollower();
258         actorContext.setRaftPolicy(createRaftPolicy(true, true));
259
260         long term = 1;
261         actorContext.getTermInformation().update(term, "");
262
263         leader = new Leader(actorContext);
264
265         // Leader will send an immediate heartbeat - ignore it.
266         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
267
268         // The follower would normally reply - simulate that explicitly here.
269         long lastIndex = actorContext.getReplicatedLog().lastIndex();
270         leader.handleMessage(followerActor, new AppendEntriesReply(
271                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
272         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
273
274         followerActor.underlyingActor().clear();
275
276         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
277
278         // State should not change
279         assertTrue(raftBehavior instanceof Leader);
280
281         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
283         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
284         assertEquals("Entries size", 1, appendEntries.getEntries().size());
285         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
286         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
287         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
288         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
289     }
290
291     @Test
292     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
293         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
294
295         MockRaftActorContext actorContext = createActorContextWithFollower();
296         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
297             @Override
298             public FiniteDuration getHeartBeatInterval() {
299                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
300             }
301         });
302
303         long term = 1;
304         actorContext.getTermInformation().update(term, "");
305
306         leader = new Leader(actorContext);
307
308         // Leader will send an immediate heartbeat - ignore it.
309         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
310
311         // The follower would normally reply - simulate that explicitly here.
312         long lastIndex = actorContext.getReplicatedLog().lastIndex();
313         leader.handleMessage(followerActor, new AppendEntriesReply(
314                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
315         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
316
317         followerActor.underlyingActor().clear();
318
319         for(int i=0;i<5;i++) {
320             sendReplicate(actorContext, lastIndex+i+1);
321         }
322
323         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
324         // We expect only 1 message to be sent because of two reasons,
325         // - an append entries reply was not received
326         // - the heartbeat interval has not expired
327         // In this scenario if multiple messages are sent they would likely be duplicates
328         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
329     }
330
331     @Test
332     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
333         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
334
335         MockRaftActorContext actorContext = createActorContextWithFollower();
336         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
337             @Override
338             public FiniteDuration getHeartBeatInterval() {
339                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
340             }
341         });
342
343         long term = 1;
344         actorContext.getTermInformation().update(term, "");
345
346         leader = new Leader(actorContext);
347
348         // Leader will send an immediate heartbeat - ignore it.
349         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
350
351         // The follower would normally reply - simulate that explicitly here.
352         long lastIndex = actorContext.getReplicatedLog().lastIndex();
353         leader.handleMessage(followerActor, new AppendEntriesReply(
354                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
355         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
356
357         followerActor.underlyingActor().clear();
358
359         for(int i=0;i<3;i++) {
360             sendReplicate(actorContext, lastIndex+i+1);
361             leader.handleMessage(followerActor, new AppendEntriesReply(
362                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
363
364         }
365
366         for(int i=3;i<5;i++) {
367             sendReplicate(actorContext, lastIndex + i + 1);
368         }
369
370         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
371         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
372         // get sent to the follower - but not the 5th
373         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
374
375         for(int i=0;i<4;i++) {
376             long expected = allMessages.get(i).getEntries().get(0).getIndex();
377             assertEquals(expected, i+2);
378         }
379     }
380
381     @Test
382     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
383         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
384
385         MockRaftActorContext actorContext = createActorContextWithFollower();
386         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
387             @Override
388             public FiniteDuration getHeartBeatInterval() {
389                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
390             }
391         });
392
393         long term = 1;
394         actorContext.getTermInformation().update(term, "");
395
396         leader = new Leader(actorContext);
397
398         // Leader will send an immediate heartbeat - ignore it.
399         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
400
401         // The follower would normally reply - simulate that explicitly here.
402         long lastIndex = actorContext.getReplicatedLog().lastIndex();
403         leader.handleMessage(followerActor, new AppendEntriesReply(
404                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
405         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
406
407         followerActor.underlyingActor().clear();
408
409         sendReplicate(actorContext, lastIndex+1);
410
411         // Wait slightly longer than heartbeat duration
412         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
413
414         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
415
416         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
417         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
418
419         assertEquals(1, allMessages.get(0).getEntries().size());
420         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
421         assertEquals(1, allMessages.get(1).getEntries().size());
422         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
423
424     }
425
426     @Test
427     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
428         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
432             @Override
433             public FiniteDuration getHeartBeatInterval() {
434                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
435             }
436         });
437
438         long term = 1;
439         actorContext.getTermInformation().update(term, "");
440
441         leader = new Leader(actorContext);
442
443         // Leader will send an immediate heartbeat - ignore it.
444         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
445
446         // The follower would normally reply - simulate that explicitly here.
447         long lastIndex = actorContext.getReplicatedLog().lastIndex();
448         leader.handleMessage(followerActor, new AppendEntriesReply(
449                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
450         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
451
452         followerActor.underlyingActor().clear();
453
454         for(int i=0;i<3;i++) {
455             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
456             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
457         }
458
459         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
460         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
461     }
462
463     @Test
464     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
465         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
466
467         MockRaftActorContext actorContext = createActorContextWithFollower();
468         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
469             @Override
470             public FiniteDuration getHeartBeatInterval() {
471                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
472             }
473         });
474
475         long term = 1;
476         actorContext.getTermInformation().update(term, "");
477
478         leader = new Leader(actorContext);
479
480         // Leader will send an immediate heartbeat - ignore it.
481         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482
483         // The follower would normally reply - simulate that explicitly here.
484         long lastIndex = actorContext.getReplicatedLog().lastIndex();
485         leader.handleMessage(followerActor, new AppendEntriesReply(
486                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
487         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
488
489         followerActor.underlyingActor().clear();
490
491         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493         sendReplicate(actorContext, lastIndex+1);
494
495         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
497
498         assertEquals(0, allMessages.get(0).getEntries().size());
499         assertEquals(1, allMessages.get(1).getEntries().size());
500     }
501
502
503     @Test
504     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
505         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
506
507         MockRaftActorContext actorContext = createActorContext();
508
509         leader = new Leader(actorContext);
510
511         actorContext.setLastApplied(0);
512
513         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
514         long term = actorContext.getTermInformation().getCurrentTerm();
515         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
516                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
517
518         actorContext.getReplicatedLog().append(newEntry);
519
520         final Identifier id = new MockIdentifier("state-id");
521         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
522
523         // State should not change
524         assertTrue(raftBehavior instanceof Leader);
525
526         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
527
528         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
529         // one since lastApplied state is 0.
530         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
531                 leaderActor, ApplyState.class);
532         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
533
534         for(int i = 0; i <= newLogIndex - 1; i++ ) {
535             ApplyState applyState = applyStateList.get(i);
536             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
537             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
538         }
539
540         ApplyState last = applyStateList.get((int) newLogIndex - 1);
541         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
542         assertEquals("getIdentifier", id, last.getIdentifier());
543     }
544
545     @Test
546     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
547         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
548
549         MockRaftActorContext actorContext = createActorContextWithFollower();
550
551         Map<String, String> leadersSnapshot = new HashMap<>();
552         leadersSnapshot.put("1", "A");
553         leadersSnapshot.put("2", "B");
554         leadersSnapshot.put("3", "C");
555
556         //clears leaders log
557         actorContext.getReplicatedLog().removeFrom(0);
558
559         final int commitIndex = 3;
560         final int snapshotIndex = 2;
561         final int newEntryIndex = 4;
562         final int snapshotTerm = 1;
563         final int currentTerm = 2;
564
565         // set the snapshot variables in replicatedlog
566         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
567         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
568         actorContext.setCommitIndex(commitIndex);
569         //set follower timeout to 2 mins, helps during debugging
570         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
571
572         leader = new Leader(actorContext);
573
574         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
575         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
576
577         // new entry
578         ReplicatedLogImplEntry entry =
579                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580                         new MockRaftActorContext.MockPayload("D"));
581
582         //update follower timestamp
583         leader.markFollowerActive(FOLLOWER_ID);
584
585         ByteString bs = toByteString(leadersSnapshot);
586         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
587                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
588         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
589                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
590         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
591
592         //send first chunk and no InstallSnapshotReply received yet
593         fts.getNextChunk();
594         fts.incrementChunkIndex();
595
596         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
597                 TimeUnit.MILLISECONDS);
598
599         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
600
601         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
602
603         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
604
605         //InstallSnapshotReply received
606         fts.markSendStatus(true);
607
608         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
609
610         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
611
612         assertEquals(commitIndex, is.getLastIncludedIndex());
613     }
614
615     @Test
616     public void testSendAppendEntriesSnapshotScenario() throws Exception {
617         logStart("testSendAppendEntriesSnapshotScenario");
618
619         MockRaftActorContext actorContext = createActorContextWithFollower();
620
621         Map<String, String> leadersSnapshot = new HashMap<>();
622         leadersSnapshot.put("1", "A");
623         leadersSnapshot.put("2", "B");
624         leadersSnapshot.put("3", "C");
625
626         //clears leaders log
627         actorContext.getReplicatedLog().removeFrom(0);
628
629         final int followersLastIndex = 2;
630         final int snapshotIndex = 3;
631         final int newEntryIndex = 4;
632         final int snapshotTerm = 1;
633         final int currentTerm = 2;
634
635         // set the snapshot variables in replicatedlog
636         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638         actorContext.setCommitIndex(followersLastIndex);
639
640         leader = new Leader(actorContext);
641
642         // Leader will send an immediate heartbeat - ignore it.
643         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
644
645         // new entry
646         ReplicatedLogImplEntry entry =
647                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
648                         new MockRaftActorContext.MockPayload("D"));
649
650         actorContext.getReplicatedLog().append(entry);
651
652         //update follower timestamp
653         leader.markFollowerActive(FOLLOWER_ID);
654
655         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
656         RaftActorBehavior raftBehavior = leader.handleMessage(
657                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
658
659         assertTrue(raftBehavior instanceof Leader);
660
661         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
662     }
663
664     @Test
665     public void testInitiateInstallSnapshot() throws Exception {
666         logStart("testInitiateInstallSnapshot");
667
668         MockRaftActorContext actorContext = createActorContextWithFollower();
669
670         //clears leaders log
671         actorContext.getReplicatedLog().removeFrom(0);
672
673         final int followersLastIndex = 2;
674         final int snapshotIndex = 3;
675         final int newEntryIndex = 4;
676         final int snapshotTerm = 1;
677         final int currentTerm = 2;
678
679         // set the snapshot variables in replicatedlog
680         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
681         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
682         actorContext.setLastApplied(3);
683         actorContext.setCommitIndex(followersLastIndex);
684
685         leader = new Leader(actorContext);
686
687         // Leader will send an immediate heartbeat - ignore it.
688         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689
690         // set the snapshot as absent and check if capture-snapshot is invoked.
691         leader.setSnapshot(null);
692
693         // new entry
694         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
695                 new MockRaftActorContext.MockPayload("D"));
696
697         actorContext.getReplicatedLog().append(entry);
698
699         //update follower timestamp
700         leader.markFollowerActive(FOLLOWER_ID);
701
702         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
703
704         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
705
706         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
707
708         assertTrue(cs.isInstallSnapshotInitiated());
709         assertEquals(3, cs.getLastAppliedIndex());
710         assertEquals(1, cs.getLastAppliedTerm());
711         assertEquals(4, cs.getLastIndex());
712         assertEquals(2, cs.getLastTerm());
713
714         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
715         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
716
717         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
718     }
719
720     @Test
721     public void testInitiateForceInstallSnapshot() throws Exception {
722         logStart("testInitiateForceInstallSnapshot");
723
724         MockRaftActorContext actorContext = createActorContextWithFollower();
725
726         final int followersLastIndex = 2;
727         final int snapshotIndex = -1;
728         final int newEntryIndex = 4;
729         final int snapshotTerm = -1;
730         final int currentTerm = 2;
731
732         // set the snapshot variables in replicatedlog
733         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
734         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
735         actorContext.setLastApplied(3);
736         actorContext.setCommitIndex(followersLastIndex);
737
738         actorContext.getReplicatedLog().removeFrom(0);
739
740         leader = new Leader(actorContext);
741
742         // Leader will send an immediate heartbeat - ignore it.
743         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
744
745         // set the snapshot as absent and check if capture-snapshot is invoked.
746         leader.setSnapshot(null);
747
748         for(int i=0;i<4;i++) {
749             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
750                     new MockRaftActorContext.MockPayload("X" + i)));
751         }
752
753         // new entry
754         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
755                 new MockRaftActorContext.MockPayload("D"));
756
757         actorContext.getReplicatedLog().append(entry);
758
759         //update follower timestamp
760         leader.markFollowerActive(FOLLOWER_ID);
761
762         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
763         // installed with a SendInstallSnapshot
764         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
765
766         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
767
768         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
769
770         assertTrue(cs.isInstallSnapshotInitiated());
771         assertEquals(3, cs.getLastAppliedIndex());
772         assertEquals(1, cs.getLastAppliedTerm());
773         assertEquals(4, cs.getLastIndex());
774         assertEquals(2, cs.getLastTerm());
775
776         // if an initiate is started again when first is in progress, it should not initiate Capture
777         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
778
779         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
780     }
781
782
783     @Test
784     public void testInstallSnapshot() throws Exception {
785         logStart("testInstallSnapshot");
786
787         MockRaftActorContext actorContext = createActorContextWithFollower();
788
789         Map<String, String> leadersSnapshot = new HashMap<>();
790         leadersSnapshot.put("1", "A");
791         leadersSnapshot.put("2", "B");
792         leadersSnapshot.put("3", "C");
793
794         //clears leaders log
795         actorContext.getReplicatedLog().removeFrom(0);
796
797         final int lastAppliedIndex = 3;
798         final int snapshotIndex = 2;
799         final int snapshotTerm = 1;
800         final int currentTerm = 2;
801
802         // set the snapshot variables in replicatedlog
803         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806         actorContext.setCommitIndex(lastAppliedIndex);
807         actorContext.setLastApplied(lastAppliedIndex);
808
809         leader = new Leader(actorContext);
810
811         // Initial heartbeat.
812         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
813
814         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
815         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
816
817         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
818                 Collections.<ReplicatedLogEntry>emptyList(),
819                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
820
821         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
822
823         assertTrue(raftBehavior instanceof Leader);
824
825         // check if installsnapshot gets called with the correct values.
826
827         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
828
829         assertNotNull(installSnapshot.getData());
830         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
831         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
832
833         assertEquals(currentTerm, installSnapshot.getTerm());
834     }
835
836     @Test
837     public void testForceInstallSnapshot() throws Exception {
838         logStart("testForceInstallSnapshot");
839
840         MockRaftActorContext actorContext = createActorContextWithFollower();
841
842         Map<String, String> leadersSnapshot = new HashMap<>();
843         leadersSnapshot.put("1", "A");
844         leadersSnapshot.put("2", "B");
845         leadersSnapshot.put("3", "C");
846
847         final int lastAppliedIndex = 3;
848         final int snapshotIndex = -1;
849         final int snapshotTerm = -1;
850         final int currentTerm = 2;
851
852         // set the snapshot variables in replicatedlog
853         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856         actorContext.setCommitIndex(lastAppliedIndex);
857         actorContext.setLastApplied(lastAppliedIndex);
858
859         leader = new Leader(actorContext);
860
861         // Initial heartbeat.
862         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863
864         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
866
867         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
868                 Collections.<ReplicatedLogEntry>emptyList(),
869                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
870
871         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
872
873         assertTrue(raftBehavior instanceof Leader);
874
875         // check if installsnapshot gets called with the correct values.
876
877         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
878
879         assertNotNull(installSnapshot.getData());
880         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
881         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
882
883         assertEquals(currentTerm, installSnapshot.getTerm());
884     }
885
886     @Test
887     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
888         logStart("testHandleInstallSnapshotReplyLastChunk");
889
890         MockRaftActorContext actorContext = createActorContextWithFollower();
891
892         final int commitIndex = 3;
893         final int snapshotIndex = 2;
894         final int snapshotTerm = 1;
895         final int currentTerm = 2;
896
897         actorContext.setCommitIndex(commitIndex);
898
899         leader = new Leader(actorContext);
900         actorContext.setCurrentBehavior(leader);
901
902         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
903         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
904
905         // Ignore initial heartbeat.
906         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
907
908         Map<String, String> leadersSnapshot = new HashMap<>();
909         leadersSnapshot.put("1", "A");
910         leadersSnapshot.put("2", "B");
911         leadersSnapshot.put("3", "C");
912
913         // set the snapshot variables in replicatedlog
914
915         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
916         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
917         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
918
919         ByteString bs = toByteString(leadersSnapshot);
920         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
921                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
922         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
923                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
924         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
925         while(!fts.isLastChunk(fts.getChunkIndex())) {
926             fts.getNextChunk();
927             fts.incrementChunkIndex();
928         }
929
930         //clears leaders log
931         actorContext.getReplicatedLog().removeFrom(0);
932
933         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
934                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
935
936         assertTrue(raftBehavior instanceof Leader);
937
938         assertEquals(1, leader.followerLogSize());
939         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
940         assertNotNull(fli);
941         assertNull(fli.getInstallSnapshotState());
942         assertEquals(commitIndex, fli.getMatchIndex());
943         assertEquals(commitIndex + 1, fli.getNextIndex());
944         assertFalse(leader.hasSnapshot());
945     }
946
947     @Test
948     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
949         logStart("testSendSnapshotfromInstallSnapshotReply");
950
951         MockRaftActorContext actorContext = createActorContextWithFollower();
952
953         final int commitIndex = 3;
954         final int snapshotIndex = 2;
955         final int snapshotTerm = 1;
956         final int currentTerm = 2;
957
958         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
959             @Override
960             public int getSnapshotChunkSize() {
961                 return 50;
962             }
963         };
964         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
965         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
966
967         actorContext.setConfigParams(configParams);
968         actorContext.setCommitIndex(commitIndex);
969
970         leader = new Leader(actorContext);
971         actorContext.setCurrentBehavior(leader);
972
973         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
974         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
975
976         Map<String, String> leadersSnapshot = new HashMap<>();
977         leadersSnapshot.put("1", "A");
978         leadersSnapshot.put("2", "B");
979         leadersSnapshot.put("3", "C");
980
981         // set the snapshot variables in replicatedlog
982         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
983         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
984         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
985
986         ByteString bs = toByteString(leadersSnapshot);
987         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
988                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
989         leader.setSnapshot(snapshot);
990
991         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
992
993         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
994
995         assertEquals(1, installSnapshot.getChunkIndex());
996         assertEquals(3, installSnapshot.getTotalChunks());
997
998         followerActor.underlyingActor().clear();
999         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1000                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1001
1002         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1003
1004         assertEquals(2, installSnapshot.getChunkIndex());
1005         assertEquals(3, installSnapshot.getTotalChunks());
1006
1007         followerActor.underlyingActor().clear();
1008         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1010
1011         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1012
1013         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1014         followerActor.underlyingActor().clear();
1015         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1016                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1017
1018         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1019
1020         assertNull(installSnapshot);
1021     }
1022
1023
1024     @Test
1025     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1026         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1027
1028         MockRaftActorContext actorContext = createActorContextWithFollower();
1029
1030         final int commitIndex = 3;
1031         final int snapshotIndex = 2;
1032         final int snapshotTerm = 1;
1033         final int currentTerm = 2;
1034
1035         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1036             @Override
1037             public int getSnapshotChunkSize() {
1038                 return 50;
1039             }
1040         });
1041
1042         actorContext.setCommitIndex(commitIndex);
1043
1044         leader = new Leader(actorContext);
1045
1046         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1048
1049         Map<String, String> leadersSnapshot = new HashMap<>();
1050         leadersSnapshot.put("1", "A");
1051         leadersSnapshot.put("2", "B");
1052         leadersSnapshot.put("3", "C");
1053
1054         // set the snapshot variables in replicatedlog
1055         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1058
1059         ByteString bs = toByteString(leadersSnapshot);
1060         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062         leader.setSnapshot(snapshot);
1063
1064         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1065         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1066
1067         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1068
1069         assertEquals(1, installSnapshot.getChunkIndex());
1070         assertEquals(3, installSnapshot.getTotalChunks());
1071
1072         followerActor.underlyingActor().clear();
1073
1074         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1075                 FOLLOWER_ID, -1, false));
1076
1077         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1078                 TimeUnit.MILLISECONDS);
1079
1080         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1081
1082         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1083
1084         assertEquals(1, installSnapshot.getChunkIndex());
1085         assertEquals(3, installSnapshot.getTotalChunks());
1086     }
1087
1088     @Test
1089     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1090         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1091
1092         MockRaftActorContext actorContext = createActorContextWithFollower();
1093
1094         final int commitIndex = 3;
1095         final int snapshotIndex = 2;
1096         final int snapshotTerm = 1;
1097         final int currentTerm = 2;
1098
1099         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1100             @Override
1101             public int getSnapshotChunkSize() {
1102                 return 50;
1103             }
1104         });
1105
1106         actorContext.setCommitIndex(commitIndex);
1107
1108         leader = new Leader(actorContext);
1109
1110         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1111         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1112
1113         Map<String, String> leadersSnapshot = new HashMap<>();
1114         leadersSnapshot.put("1", "A");
1115         leadersSnapshot.put("2", "B");
1116         leadersSnapshot.put("3", "C");
1117
1118         // set the snapshot variables in replicatedlog
1119         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1120         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1121         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1122
1123         ByteString bs = toByteString(leadersSnapshot);
1124         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1125                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1126         leader.setSnapshot(snapshot);
1127
1128         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1129
1130         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1131
1132         assertEquals(1, installSnapshot.getChunkIndex());
1133         assertEquals(3, installSnapshot.getTotalChunks());
1134         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1135                 installSnapshot.getLastChunkHashCode().get().intValue());
1136
1137         int hashCode = Arrays.hashCode(installSnapshot.getData());
1138
1139         followerActor.underlyingActor().clear();
1140
1141         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1142                 FOLLOWER_ID, 1, true));
1143
1144         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1145
1146         assertEquals(2, installSnapshot.getChunkIndex());
1147         assertEquals(3, installSnapshot.getTotalChunks());
1148         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1149     }
1150
1151     @Test
1152     public void testLeaderInstallSnapshotState() {
1153         logStart("testLeaderInstallSnapshotState");
1154
1155         Map<String, String> leadersSnapshot = new HashMap<>();
1156         leadersSnapshot.put("1", "A");
1157         leadersSnapshot.put("2", "B");
1158         leadersSnapshot.put("3", "C");
1159
1160         ByteString bs = toByteString(leadersSnapshot);
1161         byte[] barray = bs.toByteArray();
1162
1163         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
1164
1165         assertEquals(bs.size(), barray.length);
1166
1167         int chunkIndex=0;
1168         for (int i=0; i < barray.length; i = i + 50) {
1169             int j = i + 50;
1170             chunkIndex++;
1171
1172             if (i + 50 > barray.length) {
1173                 j = barray.length;
1174             }
1175
1176             byte[] chunk = fts.getNextChunk();
1177             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1178             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1179
1180             fts.markSendStatus(true);
1181             if (!fts.isLastChunk(chunkIndex)) {
1182                 fts.incrementChunkIndex();
1183             }
1184         }
1185
1186         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1187     }
1188
1189     @Override
1190     protected Leader createBehavior(final RaftActorContext actorContext) {
1191         return new Leader(actorContext);
1192     }
1193
1194     @Override
1195     protected MockRaftActorContext createActorContext() {
1196         return createActorContext(leaderActor);
1197     }
1198
1199     @Override
1200     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1201         return createActorContext(LEADER_ID, actorRef);
1202     }
1203
1204     private MockRaftActorContext createActorContextWithFollower() {
1205         MockRaftActorContext actorContext = createActorContext();
1206         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1207                 followerActor.path().toString()).build());
1208         return actorContext;
1209     }
1210
1211     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1212         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1213         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1214         configParams.setElectionTimeoutFactor(100000);
1215         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1216         context.setConfigParams(configParams);
1217         context.setPayloadVersion(payloadVersion);
1218         return context;
1219     }
1220
1221     private MockRaftActorContext createFollowerActorContextWithLeader() {
1222         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1223         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1224         followerConfig.setElectionTimeoutFactor(10000);
1225         followerActorContext.setConfigParams(followerConfig);
1226         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1227         return followerActorContext;
1228     }
1229
1230     @Test
1231     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1232         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1233
1234         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1235
1236         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1237
1238         Follower follower = new Follower(followerActorContext);
1239         followerActor.underlyingActor().setBehavior(follower);
1240         followerActorContext.setCurrentBehavior(follower);
1241
1242         Map<String, String> peerAddresses = new HashMap<>();
1243         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1244
1245         leaderActorContext.setPeerAddresses(peerAddresses);
1246
1247         leaderActorContext.getReplicatedLog().removeFrom(0);
1248
1249         //create 3 entries
1250         leaderActorContext.setReplicatedLog(
1251                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1252
1253         leaderActorContext.setCommitIndex(1);
1254
1255         followerActorContext.getReplicatedLog().removeFrom(0);
1256
1257         // follower too has the exact same log entries and has the same commit index
1258         followerActorContext.setReplicatedLog(
1259                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1260
1261         followerActorContext.setCommitIndex(1);
1262
1263         leader = new Leader(leaderActorContext);
1264         leaderActorContext.setCurrentBehavior(leader);
1265
1266         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1267
1268         assertEquals(1, appendEntries.getLeaderCommit());
1269         assertEquals(0, appendEntries.getEntries().size());
1270         assertEquals(0, appendEntries.getPrevLogIndex());
1271
1272         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1273                 leaderActor, AppendEntriesReply.class);
1274
1275         assertEquals(2, appendEntriesReply.getLogLastIndex());
1276         assertEquals(1, appendEntriesReply.getLogLastTerm());
1277
1278         // follower returns its next index
1279         assertEquals(2, appendEntriesReply.getLogLastIndex());
1280         assertEquals(1, appendEntriesReply.getLogLastTerm());
1281
1282         follower.close();
1283     }
1284
1285     @Test
1286     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1287         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1288
1289         MockRaftActorContext leaderActorContext = createActorContext();
1290
1291         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1292         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1293
1294         Follower follower = new Follower(followerActorContext);
1295         followerActor.underlyingActor().setBehavior(follower);
1296         followerActorContext.setCurrentBehavior(follower);
1297
1298         Map<String, String> leaderPeerAddresses = new HashMap<>();
1299         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1300
1301         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1302
1303         leaderActorContext.getReplicatedLog().removeFrom(0);
1304
1305         leaderActorContext.setReplicatedLog(
1306                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1307
1308         leaderActorContext.setCommitIndex(1);
1309
1310         followerActorContext.getReplicatedLog().removeFrom(0);
1311
1312         followerActorContext.setReplicatedLog(
1313                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1314
1315         // follower has the same log entries but its commit index > leaders commit index
1316         followerActorContext.setCommitIndex(2);
1317
1318         leader = new Leader(leaderActorContext);
1319
1320         // Initial heartbeat
1321         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1322
1323         assertEquals(1, appendEntries.getLeaderCommit());
1324         assertEquals(0, appendEntries.getEntries().size());
1325         assertEquals(0, appendEntries.getPrevLogIndex());
1326
1327         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1328                 leaderActor, AppendEntriesReply.class);
1329
1330         assertEquals(2, appendEntriesReply.getLogLastIndex());
1331         assertEquals(1, appendEntriesReply.getLogLastTerm());
1332
1333         leaderActor.underlyingActor().setBehavior(follower);
1334         leader.handleMessage(followerActor, appendEntriesReply);
1335
1336         leaderActor.underlyingActor().clear();
1337         followerActor.underlyingActor().clear();
1338
1339         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1340                 TimeUnit.MILLISECONDS);
1341
1342         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1343
1344         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1345
1346         assertEquals(2, appendEntries.getLeaderCommit());
1347         assertEquals(0, appendEntries.getEntries().size());
1348         assertEquals(2, appendEntries.getPrevLogIndex());
1349
1350         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1351
1352         assertEquals(2, appendEntriesReply.getLogLastIndex());
1353         assertEquals(1, appendEntriesReply.getLogLastTerm());
1354
1355         assertEquals(2, followerActorContext.getCommitIndex());
1356
1357         follower.close();
1358     }
1359
1360     @Test
1361     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1362         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1363
1364         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1365         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1366                 new FiniteDuration(1000, TimeUnit.SECONDS));
1367
1368         leaderActorContext.setReplicatedLog(
1369                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1370         long leaderCommitIndex = 2;
1371         leaderActorContext.setCommitIndex(leaderCommitIndex);
1372         leaderActorContext.setLastApplied(leaderCommitIndex);
1373
1374         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1375         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1376
1377         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1378
1379         followerActorContext.setReplicatedLog(
1380                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1381         followerActorContext.setCommitIndex(0);
1382         followerActorContext.setLastApplied(0);
1383
1384         Follower follower = new Follower(followerActorContext);
1385         followerActor.underlyingActor().setBehavior(follower);
1386
1387         leader = new Leader(leaderActorContext);
1388
1389         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1390         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1391
1392         MessageCollectorActor.clearMessages(followerActor);
1393         MessageCollectorActor.clearMessages(leaderActor);
1394
1395         // Verify initial AppendEntries sent with the leader's current commit index.
1396         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1397         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1398         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1399
1400         leaderActor.underlyingActor().setBehavior(leader);
1401
1402         leader.handleMessage(followerActor, appendEntriesReply);
1403
1404         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1405         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1406
1407         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1409         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1410
1411         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1412         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1413                 appendEntries.getEntries().get(0).getData());
1414         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1415         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1416                 appendEntries.getEntries().get(1).getData());
1417
1418         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1419         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1420
1421         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1422
1423         ApplyState applyState = applyStateList.get(0);
1424         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1425         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1426         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1427                 applyState.getReplicatedLogEntry().getData());
1428
1429         applyState = applyStateList.get(1);
1430         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1431         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1432         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1433                 applyState.getReplicatedLogEntry().getData());
1434
1435         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1436         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1437     }
1438
1439     @Test
1440     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1441         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1442
1443         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1444         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1445                 new FiniteDuration(1000, TimeUnit.SECONDS));
1446
1447         leaderActorContext.setReplicatedLog(
1448                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1449         long leaderCommitIndex = 1;
1450         leaderActorContext.setCommitIndex(leaderCommitIndex);
1451         leaderActorContext.setLastApplied(leaderCommitIndex);
1452
1453         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1454         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1455
1456         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1457
1458         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1459         followerActorContext.setCommitIndex(-1);
1460         followerActorContext.setLastApplied(-1);
1461
1462         Follower follower = new Follower(followerActorContext);
1463         followerActor.underlyingActor().setBehavior(follower);
1464         followerActorContext.setCurrentBehavior(follower);
1465
1466         leader = new Leader(leaderActorContext);
1467
1468         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1469         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1470
1471         MessageCollectorActor.clearMessages(followerActor);
1472         MessageCollectorActor.clearMessages(leaderActor);
1473
1474         // Verify initial AppendEntries sent with the leader's current commit index.
1475         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1476         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1477         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1478
1479         leaderActor.underlyingActor().setBehavior(leader);
1480         leaderActorContext.setCurrentBehavior(leader);
1481
1482         leader.handleMessage(followerActor, appendEntriesReply);
1483
1484         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1485         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1486
1487         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1488         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1489         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1490
1491         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1492         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1493                 appendEntries.getEntries().get(0).getData());
1494         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1495         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1496                 appendEntries.getEntries().get(1).getData());
1497
1498         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1499         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1500
1501         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1502
1503         ApplyState applyState = applyStateList.get(0);
1504         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1505         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1506         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1507                 applyState.getReplicatedLogEntry().getData());
1508
1509         applyState = applyStateList.get(1);
1510         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1511         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1512         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1513                 applyState.getReplicatedLogEntry().getData());
1514
1515         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1516         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1517     }
1518
1519     @Test
1520     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1521         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1522
1523         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1524         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1525                 new FiniteDuration(1000, TimeUnit.SECONDS));
1526
1527         leaderActorContext.setReplicatedLog(
1528                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1529         long leaderCommitIndex = 1;
1530         leaderActorContext.setCommitIndex(leaderCommitIndex);
1531         leaderActorContext.setLastApplied(leaderCommitIndex);
1532
1533         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1534         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1535
1536         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1537
1538         followerActorContext.setReplicatedLog(
1539                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1540         followerActorContext.setCommitIndex(-1);
1541         followerActorContext.setLastApplied(-1);
1542
1543         Follower follower = new Follower(followerActorContext);
1544         followerActor.underlyingActor().setBehavior(follower);
1545         followerActorContext.setCurrentBehavior(follower);
1546
1547         leader = new Leader(leaderActorContext);
1548
1549         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1550         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1551
1552         MessageCollectorActor.clearMessages(followerActor);
1553         MessageCollectorActor.clearMessages(leaderActor);
1554
1555         // Verify initial AppendEntries sent with the leader's current commit index.
1556         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1557         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1558         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1559
1560         leaderActor.underlyingActor().setBehavior(leader);
1561         leaderActorContext.setCurrentBehavior(leader);
1562
1563         leader.handleMessage(followerActor, appendEntriesReply);
1564
1565         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1566         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1567
1568         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1569         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1570         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1571
1572         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1573         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1574         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1575                 appendEntries.getEntries().get(0).getData());
1576         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1577         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1578         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1579                 appendEntries.getEntries().get(1).getData());
1580
1581         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1582         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1583
1584         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1585
1586         ApplyState applyState = applyStateList.get(0);
1587         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1588         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1589         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1590                 applyState.getReplicatedLogEntry().getData());
1591
1592         applyState = applyStateList.get(1);
1593         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1594         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1595         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1596                 applyState.getReplicatedLogEntry().getData());
1597
1598         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1599         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1600         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1601     }
1602
1603     @Test
1604     public void testHandleAppendEntriesReplyWithNewerTerm(){
1605         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1606
1607         MockRaftActorContext leaderActorContext = createActorContext();
1608         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1609                 new FiniteDuration(10000, TimeUnit.SECONDS));
1610
1611         leaderActorContext.setReplicatedLog(
1612                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1613
1614         leader = new Leader(leaderActorContext);
1615         leaderActor.underlyingActor().setBehavior(leader);
1616         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1617
1618         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1619
1620         assertEquals(false, appendEntriesReply.isSuccess());
1621         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1622
1623         MessageCollectorActor.clearMessages(leaderActor);
1624     }
1625
1626     @Test
1627     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1628         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1629
1630         MockRaftActorContext leaderActorContext = createActorContext();
1631         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1632                 new FiniteDuration(10000, TimeUnit.SECONDS));
1633
1634         leaderActorContext.setReplicatedLog(
1635                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1636         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1637
1638         leader = new Leader(leaderActorContext);
1639         leaderActor.underlyingActor().setBehavior(leader);
1640         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1641
1642         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1643
1644         assertEquals(false, appendEntriesReply.isSuccess());
1645         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1646
1647         MessageCollectorActor.clearMessages(leaderActor);
1648     }
1649
1650     @Test
1651     public void testHandleAppendEntriesReplySuccess() throws Exception {
1652         logStart("testHandleAppendEntriesReplySuccess");
1653
1654         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1655
1656         leaderActorContext.setReplicatedLog(
1657                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1658
1659         leaderActorContext.setCommitIndex(1);
1660         leaderActorContext.setLastApplied(1);
1661         leaderActorContext.getTermInformation().update(1, "leader");
1662
1663         leader = new Leader(leaderActorContext);
1664
1665         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1666
1667         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1668         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1669
1670         short payloadVersion = 5;
1671         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1672
1673         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1674
1675         assertEquals(RaftState.Leader, raftActorBehavior.state());
1676
1677         assertEquals(2, leaderActorContext.getCommitIndex());
1678
1679         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1680                 leaderActor, ApplyJournalEntries.class);
1681
1682         assertEquals(2, leaderActorContext.getLastApplied());
1683
1684         assertEquals(2, applyJournalEntries.getToIndex());
1685
1686         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1687                 ApplyState.class);
1688
1689         assertEquals(1,applyStateList.size());
1690
1691         ApplyState applyState = applyStateList.get(0);
1692
1693         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1694
1695         assertEquals(2, followerInfo.getMatchIndex());
1696         assertEquals(3, followerInfo.getNextIndex());
1697         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1698         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1699     }
1700
1701     @Test
1702     public void testHandleAppendEntriesReplyUnknownFollower(){
1703         logStart("testHandleAppendEntriesReplyUnknownFollower");
1704
1705         MockRaftActorContext leaderActorContext = createActorContext();
1706
1707         leader = new Leader(leaderActorContext);
1708
1709         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1710
1711         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1712
1713         assertEquals(RaftState.Leader, raftActorBehavior.state());
1714     }
1715
1716     @Test
1717     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1718         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1719
1720         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1721         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1722                 new FiniteDuration(1000, TimeUnit.SECONDS));
1723         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1724
1725         leaderActorContext.setReplicatedLog(
1726                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1727         long leaderCommitIndex = 3;
1728         leaderActorContext.setCommitIndex(leaderCommitIndex);
1729         leaderActorContext.setLastApplied(leaderCommitIndex);
1730
1731         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1732         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1733         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1734         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1735
1736         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1737
1738         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1739         followerActorContext.setCommitIndex(-1);
1740         followerActorContext.setLastApplied(-1);
1741
1742         Follower follower = new Follower(followerActorContext);
1743         followerActor.underlyingActor().setBehavior(follower);
1744         followerActorContext.setCurrentBehavior(follower);
1745
1746         leader = new Leader(leaderActorContext);
1747
1748         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1749         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1750
1751         MessageCollectorActor.clearMessages(followerActor);
1752         MessageCollectorActor.clearMessages(leaderActor);
1753
1754         // Verify initial AppendEntries sent with the leader's current commit index.
1755         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1756         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1757         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1758
1759         leaderActor.underlyingActor().setBehavior(leader);
1760         leaderActorContext.setCurrentBehavior(leader);
1761
1762         leader.handleMessage(followerActor, appendEntriesReply);
1763
1764         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1765         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1766
1767         appendEntries = appendEntriesList.get(0);
1768         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1769         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1770         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1771
1772         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1773         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1774                 appendEntries.getEntries().get(0).getData());
1775         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1776         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1777                 appendEntries.getEntries().get(1).getData());
1778
1779         appendEntries = appendEntriesList.get(1);
1780         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1781         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1782         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1783
1784         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1785         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1786                 appendEntries.getEntries().get(0).getData());
1787         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1788         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1789                 appendEntries.getEntries().get(1).getData());
1790
1791         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1792         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1793
1794         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1795
1796         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1797         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1798     }
1799
1800     @Test
1801     public void testHandleRequestVoteReply(){
1802         logStart("testHandleRequestVoteReply");
1803
1804         MockRaftActorContext leaderActorContext = createActorContext();
1805
1806         leader = new Leader(leaderActorContext);
1807
1808         // Should be a no-op.
1809         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1810                 new RequestVoteReply(1, true));
1811
1812         assertEquals(RaftState.Leader, raftActorBehavior.state());
1813
1814         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1815
1816         assertEquals(RaftState.Leader, raftActorBehavior.state());
1817     }
1818
1819     @Test
1820     public void testIsolatedLeaderCheckNoFollowers() {
1821         logStart("testIsolatedLeaderCheckNoFollowers");
1822
1823         MockRaftActorContext leaderActorContext = createActorContext();
1824
1825         leader = new Leader(leaderActorContext);
1826         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1827         assertTrue(behavior instanceof Leader);
1828     }
1829
1830     @Test
1831     public void testIsolatedLeaderCheckNoVotingFollowers() {
1832         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1833
1834         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1835         Follower follower = new Follower(followerActorContext);
1836         followerActor.underlyingActor().setBehavior(follower);
1837
1838         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1839         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1840                 new FiniteDuration(1000, TimeUnit.SECONDS));
1841         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1842
1843         leader = new Leader(leaderActorContext);
1844         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1845         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1846         assertTrue("Expected Leader", behavior instanceof Leader);
1847     }
1848
1849     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1850         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1851         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1852
1853         MockRaftActorContext leaderActorContext = createActorContext();
1854
1855         Map<String, String> peerAddresses = new HashMap<>();
1856         peerAddresses.put("follower-1", followerActor1.path().toString());
1857         peerAddresses.put("follower-2", followerActor2.path().toString());
1858
1859         leaderActorContext.setPeerAddresses(peerAddresses);
1860         leaderActorContext.setRaftPolicy(raftPolicy);
1861
1862         leader = new Leader(leaderActorContext);
1863
1864         leader.markFollowerActive("follower-1");
1865         leader.markFollowerActive("follower-2");
1866         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1867         assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1868
1869         // kill 1 follower and verify if that got killed
1870         final JavaTestKit probe = new JavaTestKit(getSystem());
1871         probe.watch(followerActor1);
1872         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1873         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1874         assertEquals(termMsg1.getActor(), followerActor1);
1875
1876         leader.markFollowerInActive("follower-1");
1877         leader.markFollowerActive("follower-2");
1878         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1879         assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1880
1881         // kill 2nd follower and leader should change to Isolated leader
1882         followerActor2.tell(PoisonPill.getInstance(), null);
1883         probe.watch(followerActor2);
1884         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1885         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1886         assertEquals(termMsg2.getActor(), followerActor2);
1887
1888         leader.markFollowerInActive("follower-2");
1889         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1890     }
1891
1892     @Test
1893     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1894         logStart("testIsolatedLeaderCheckTwoFollowers");
1895
1896         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1897
1898         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1899             behavior instanceof IsolatedLeader);
1900     }
1901
1902     @Test
1903     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1904         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1905
1906         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1907
1908         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1909                 behavior instanceof Leader);
1910     }
1911
1912     @Test
1913     public void testLaggingFollowerStarvation() throws Exception {
1914         logStart("testLaggingFollowerStarvation");
1915         new JavaTestKit(getSystem()) {{
1916             String leaderActorId = actorFactory.generateActorId("leader");
1917             String follower1ActorId = actorFactory.generateActorId("follower");
1918             String follower2ActorId = actorFactory.generateActorId("follower");
1919
1920             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1921                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1922             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1923             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1924
1925             MockRaftActorContext leaderActorContext =
1926                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1927
1928             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1929             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1930             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1931
1932             leaderActorContext.setConfigParams(configParams);
1933
1934             leaderActorContext.setReplicatedLog(
1935                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1936
1937             Map<String, String> peerAddresses = new HashMap<>();
1938             peerAddresses.put(follower1ActorId,
1939                     follower1Actor.path().toString());
1940             peerAddresses.put(follower2ActorId,
1941                     follower2Actor.path().toString());
1942
1943             leaderActorContext.setPeerAddresses(peerAddresses);
1944             leaderActorContext.getTermInformation().update(1, leaderActorId);
1945
1946             RaftActorBehavior leader = createBehavior(leaderActorContext);
1947
1948             leaderActor.underlyingActor().setBehavior(leader);
1949
1950             for(int i=1;i<6;i++) {
1951                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1952                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1953                 assertTrue(newBehavior == leader);
1954                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1955             }
1956
1957             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1958             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1959
1960             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1961                     heartbeats.size() > 1);
1962
1963             // Check if follower-2 got AppendEntries during this time and was not starved
1964             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1965
1966             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1967                     appendEntries.size() > 1);
1968
1969         }};
1970     }
1971
1972     @Test
1973     public void testReplicationConsensusWithNonVotingFollower() {
1974         logStart("testReplicationConsensusWithNonVotingFollower");
1975
1976         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1977         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1978                 new FiniteDuration(1000, TimeUnit.SECONDS));
1979
1980         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1981         leaderActorContext.setCommitIndex(-1);
1982
1983         String nonVotingFollowerId = "nonvoting-follower";
1984         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1985                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1986
1987         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1988
1989         leader = new Leader(leaderActorContext);
1990         leaderActorContext.setCurrentBehavior(leader);
1991
1992         // Ignore initial heartbeats
1993         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1994         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1995
1996         MessageCollectorActor.clearMessages(followerActor);
1997         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1998         MessageCollectorActor.clearMessages(leaderActor);
1999
2000         // Send a Replicate message and wait for AppendEntries.
2001         sendReplicate(leaderActorContext, 0);
2002
2003         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2004         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2005
2006         // Send reply only from the voting follower and verify consensus via ApplyState.
2007         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2008
2009         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2010
2011         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2012
2013         MessageCollectorActor.clearMessages(followerActor);
2014         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2015         MessageCollectorActor.clearMessages(leaderActor);
2016
2017         // Send another Replicate message
2018         sendReplicate(leaderActorContext, 1);
2019
2020         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2021         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2022                 AppendEntries.class);
2023         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2024         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2025
2026         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2027         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2028
2029         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2030
2031         // Send reply from the voting follower and verify consensus.
2032         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2033
2034         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2035     }
2036
2037     @Test
2038     public void testTransferLeadershipWithFollowerInSync() {
2039         logStart("testTransferLeadershipWithFollowerInSync");
2040
2041         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2042         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2043                 new FiniteDuration(1000, TimeUnit.SECONDS));
2044         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2045
2046         leader = new Leader(leaderActorContext);
2047         leaderActorContext.setCurrentBehavior(leader);
2048
2049         // Initial heartbeat
2050         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2051         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2052         MessageCollectorActor.clearMessages(followerActor);
2053
2054         sendReplicate(leaderActorContext, 0);
2055         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2056
2057         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2058         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2059         MessageCollectorActor.clearMessages(followerActor);
2060
2061         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2062         leader.transferLeadership(mockTransferCohort);
2063
2064         verify(mockTransferCohort, never()).transferComplete();
2065         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2067
2068         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2069         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2070
2071         // Leader should force an election timeout
2072         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2073
2074         verify(mockTransferCohort).transferComplete();
2075     }
2076
2077     @Test
2078     public void testTransferLeadershipWithEmptyLog() {
2079         logStart("testTransferLeadershipWithEmptyLog");
2080
2081         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2082         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2083                 new FiniteDuration(1000, TimeUnit.SECONDS));
2084         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2085
2086         leader = new Leader(leaderActorContext);
2087         leaderActorContext.setCurrentBehavior(leader);
2088
2089         // Initial heartbeat
2090         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2091         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2092         MessageCollectorActor.clearMessages(followerActor);
2093
2094         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2095         leader.transferLeadership(mockTransferCohort);
2096
2097         verify(mockTransferCohort, never()).transferComplete();
2098         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100
2101         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2102         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2103
2104         // Leader should force an election timeout
2105         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2106
2107         verify(mockTransferCohort).transferComplete();
2108     }
2109
2110     @Test
2111     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2112         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2113
2114         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2115         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2116                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2117
2118         leader = new Leader(leaderActorContext);
2119         leaderActorContext.setCurrentBehavior(leader);
2120
2121         // Initial heartbeat
2122         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2123         MessageCollectorActor.clearMessages(followerActor);
2124
2125         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2126         leader.transferLeadership(mockTransferCohort);
2127
2128         verify(mockTransferCohort, never()).transferComplete();
2129
2130         // Sync up the follower.
2131         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2132         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2133         MessageCollectorActor.clearMessages(followerActor);
2134
2135         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2136                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2137         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2138         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2139         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2140
2141         // Leader should force an election timeout
2142         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2143
2144         verify(mockTransferCohort).transferComplete();
2145     }
2146
2147     @Test
2148     public void testTransferLeadershipWithFollowerSyncTimeout() {
2149         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2150
2151         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2152         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2153                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2154         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2155         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2156
2157         leader = new Leader(leaderActorContext);
2158         leaderActorContext.setCurrentBehavior(leader);
2159
2160         // Initial heartbeat
2161         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2162         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2163         MessageCollectorActor.clearMessages(followerActor);
2164
2165         sendReplicate(leaderActorContext, 0);
2166         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2167
2168         MessageCollectorActor.clearMessages(followerActor);
2169
2170         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2171         leader.transferLeadership(mockTransferCohort);
2172
2173         verify(mockTransferCohort, never()).transferComplete();
2174
2175         // Send heartbeats to time out the transfer.
2176         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2177             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2178                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2179             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2180         }
2181
2182         verify(mockTransferCohort).abortTransfer();
2183         verify(mockTransferCohort, never()).transferComplete();
2184         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2185     }
2186
2187     @Override
2188     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2189             ActorRef actorRef, RaftRPC rpc) throws Exception {
2190         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2191         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2192     }
2193
2194     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2195
2196         private final long electionTimeOutIntervalMillis;
2197         private final int snapshotChunkSize;
2198
2199         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2200             super();
2201             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2202             this.snapshotChunkSize = snapshotChunkSize;
2203         }
2204
2205         @Override
2206         public FiniteDuration getElectionTimeOutInterval() {
2207             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2208         }
2209
2210         @Override
2211         public int getSnapshotChunkSize() {
2212             return snapshotChunkSize;
2213         }
2214     }
2215 }