41278799ed934ae95c9b7d3dee4f974892c4462e
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
60 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
65 import org.opendaylight.yangtools.concepts.Identifier;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class LeaderTest extends AbstractLeaderTest<Leader> {
69
70     static final String FOLLOWER_ID = "follower";
71     public static final String LEADER_ID = "leader";
72
73     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
74             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
75
76     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
78
79     private Leader leader;
80     private final short payloadVersion = 5;
81
82     @Override
83     @After
84     public void tearDown() throws Exception {
85         if(leader != null) {
86             leader.close();
87         }
88
89         super.tearDown();
90     }
91
92     @Test
93     public void testHandleMessageForUnknownMessage() throws Exception {
94         logStart("testHandleMessageForUnknownMessage");
95
96         leader = new Leader(createActorContext());
97
98         // handle message should null when it receives an unknown message
99         assertNull(leader.handleMessage(followerActor, "foo"));
100     }
101
102     @Test
103     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
105
106         MockRaftActorContext actorContext = createActorContextWithFollower();
107         actorContext.setCommitIndex(-1);
108         short payloadVersion = (short)5;
109         actorContext.setPayloadVersion(payloadVersion);
110
111         long term = 1;
112         actorContext.getTermInformation().update(term, "");
113
114         leader = new Leader(actorContext);
115         actorContext.setCurrentBehavior(leader);
116
117         // Leader should send an immediate heartbeat with no entries as follower is inactive.
118         long lastIndex = actorContext.getReplicatedLog().lastIndex();
119         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120         assertEquals("getTerm", term, appendEntries.getTerm());
121         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
122         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
123         assertEquals("Entries size", 0, appendEntries.getEntries().size());
124         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
125
126         // The follower would normally reply - simulate that explicitly here.
127         leader.handleMessage(followerActor, new AppendEntriesReply(
128                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
129         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
130
131         followerActor.underlyingActor().clear();
132
133         // Sleep for the heartbeat interval so AppendEntries is sent.
134         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
135                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
136
137         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
138
139         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
140         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
141         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
142         assertEquals("Entries size", 1, appendEntries.getEntries().size());
143         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
144         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
145         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
146     }
147
148
149     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
150         return sendReplicate(actorContext, 1, index);
151     }
152
153     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
154         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156                 term, index, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
159     }
160
161     @Test
162     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
163         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
164
165         MockRaftActorContext actorContext = createActorContextWithFollower();
166
167         long term = 1;
168         actorContext.getTermInformation().update(term, "");
169
170         leader = new Leader(actorContext);
171
172         // Leader will send an immediate heartbeat - ignore it.
173         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
174
175         // The follower would normally reply - simulate that explicitly here.
176         long lastIndex = actorContext.getReplicatedLog().lastIndex();
177         leader.handleMessage(followerActor, new AppendEntriesReply(
178                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
179         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
180
181         followerActor.underlyingActor().clear();
182
183         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
184
185         // State should not change
186         assertTrue(raftBehavior instanceof Leader);
187
188         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
190         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
191         assertEquals("Entries size", 1, appendEntries.getEntries().size());
192         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
193         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
194         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
195         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
196     }
197
198     @Test
199     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
200         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
201
202         MockRaftActorContext actorContext = createActorContextWithFollower();
203         actorContext.setCommitIndex(-1);
204
205         // The raft context is initialized with a couple log entries. However the commitIndex
206         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
207         // committed and applied. Now it regains leadership with a higher term (2).
208         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
209         long newTerm = prevTerm + 1;
210         actorContext.getTermInformation().update(newTerm, "");
211
212         leader = new Leader(actorContext);
213         actorContext.setCurrentBehavior(leader);
214
215         // Leader will send an immediate heartbeat - ignore it.
216         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
217
218         // The follower replies with the leader's current last index and term, simulating that it is
219         // up to date with the leader.
220         long lastIndex = actorContext.getReplicatedLog().lastIndex();
221         leader.handleMessage(followerActor, new AppendEntriesReply(
222                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
223
224         // The commit index should not get updated even though consensus was reached. This is b/c the
225         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
226         // from previous terms by counting replicas".
227         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
228
229         followerActor.underlyingActor().clear();
230
231         // Now replicate a new entry with the new term 2.
232         long newIndex = lastIndex + 1;
233         sendReplicate(actorContext, newTerm, newIndex);
234
235         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
237         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
238         assertEquals("Entries size", 1, appendEntries.getEntries().size());
239         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
240         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
241         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
242
243         // The follower replies with success. The leader should now update the commit index to the new index
244         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
245         // prior entries are committed indirectly".
246         leader.handleMessage(followerActor, new AppendEntriesReply(
247                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
248
249         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
250     }
251
252     @Test
253     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
254         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
255
256         MockRaftActorContext actorContext = createActorContextWithFollower();
257         actorContext.setRaftPolicy(createRaftPolicy(true, true));
258
259         long term = 1;
260         actorContext.getTermInformation().update(term, "");
261
262         leader = new Leader(actorContext);
263
264         // Leader will send an immediate heartbeat - ignore it.
265         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
266
267         // The follower would normally reply - simulate that explicitly here.
268         long lastIndex = actorContext.getReplicatedLog().lastIndex();
269         leader.handleMessage(followerActor, new AppendEntriesReply(
270                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
271         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
272
273         followerActor.underlyingActor().clear();
274
275         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
276
277         // State should not change
278         assertTrue(raftBehavior instanceof Leader);
279
280         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
282         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
283         assertEquals("Entries size", 1, appendEntries.getEntries().size());
284         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
285         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
286         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
287         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
288     }
289
290     @Test
291     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
292         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
293
294         MockRaftActorContext actorContext = createActorContextWithFollower();
295         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
296             @Override
297             public FiniteDuration getHeartBeatInterval() {
298                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
299             }
300         });
301
302         long term = 1;
303         actorContext.getTermInformation().update(term, "");
304
305         leader = new Leader(actorContext);
306
307         // Leader will send an immediate heartbeat - ignore it.
308         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
309
310         // The follower would normally reply - simulate that explicitly here.
311         long lastIndex = actorContext.getReplicatedLog().lastIndex();
312         leader.handleMessage(followerActor, new AppendEntriesReply(
313                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
314         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
315
316         followerActor.underlyingActor().clear();
317
318         for(int i=0;i<5;i++) {
319             sendReplicate(actorContext, lastIndex+i+1);
320         }
321
322         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
323         // We expect only 1 message to be sent because of two reasons,
324         // - an append entries reply was not received
325         // - the heartbeat interval has not expired
326         // In this scenario if multiple messages are sent they would likely be duplicates
327         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
328     }
329
330     @Test
331     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
332         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
333
334         MockRaftActorContext actorContext = createActorContextWithFollower();
335         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
336             @Override
337             public FiniteDuration getHeartBeatInterval() {
338                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
339             }
340         });
341
342         long term = 1;
343         actorContext.getTermInformation().update(term, "");
344
345         leader = new Leader(actorContext);
346
347         // Leader will send an immediate heartbeat - ignore it.
348         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
349
350         // The follower would normally reply - simulate that explicitly here.
351         long lastIndex = actorContext.getReplicatedLog().lastIndex();
352         leader.handleMessage(followerActor, new AppendEntriesReply(
353                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
354         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
355
356         followerActor.underlyingActor().clear();
357
358         for(int i=0;i<3;i++) {
359             sendReplicate(actorContext, lastIndex+i+1);
360             leader.handleMessage(followerActor, new AppendEntriesReply(
361                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
362
363         }
364
365         for(int i=3;i<5;i++) {
366             sendReplicate(actorContext, lastIndex + i + 1);
367         }
368
369         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
370         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
371         // get sent to the follower - but not the 5th
372         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
373
374         for(int i=0;i<4;i++) {
375             long expected = allMessages.get(i).getEntries().get(0).getIndex();
376             assertEquals(expected, i+2);
377         }
378     }
379
380     @Test
381     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
382         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
383
384         MockRaftActorContext actorContext = createActorContextWithFollower();
385         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
386             @Override
387             public FiniteDuration getHeartBeatInterval() {
388                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
389             }
390         });
391
392         long term = 1;
393         actorContext.getTermInformation().update(term, "");
394
395         leader = new Leader(actorContext);
396
397         // Leader will send an immediate heartbeat - ignore it.
398         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
399
400         // The follower would normally reply - simulate that explicitly here.
401         long lastIndex = actorContext.getReplicatedLog().lastIndex();
402         leader.handleMessage(followerActor, new AppendEntriesReply(
403                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
404         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
405
406         followerActor.underlyingActor().clear();
407
408         sendReplicate(actorContext, lastIndex+1);
409
410         // Wait slightly longer than heartbeat duration
411         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
412
413         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
414
415         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
416         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
417
418         assertEquals(1, allMessages.get(0).getEntries().size());
419         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
420         assertEquals(1, allMessages.get(1).getEntries().size());
421         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
422
423     }
424
425     @Test
426     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
427         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
428
429         MockRaftActorContext actorContext = createActorContextWithFollower();
430         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
431             @Override
432             public FiniteDuration getHeartBeatInterval() {
433                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
434             }
435         });
436
437         long term = 1;
438         actorContext.getTermInformation().update(term, "");
439
440         leader = new Leader(actorContext);
441
442         // Leader will send an immediate heartbeat - ignore it.
443         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
444
445         // The follower would normally reply - simulate that explicitly here.
446         long lastIndex = actorContext.getReplicatedLog().lastIndex();
447         leader.handleMessage(followerActor, new AppendEntriesReply(
448                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
449         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
450
451         followerActor.underlyingActor().clear();
452
453         for(int i=0;i<3;i++) {
454             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
455             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
456         }
457
458         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
459         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
460     }
461
462     @Test
463     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
464         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
465
466         MockRaftActorContext actorContext = createActorContextWithFollower();
467         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
468             @Override
469             public FiniteDuration getHeartBeatInterval() {
470                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
471             }
472         });
473
474         long term = 1;
475         actorContext.getTermInformation().update(term, "");
476
477         leader = new Leader(actorContext);
478
479         // Leader will send an immediate heartbeat - ignore it.
480         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
481
482         // The follower would normally reply - simulate that explicitly here.
483         long lastIndex = actorContext.getReplicatedLog().lastIndex();
484         leader.handleMessage(followerActor, new AppendEntriesReply(
485                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
486         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
487
488         followerActor.underlyingActor().clear();
489
490         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
492         sendReplicate(actorContext, lastIndex+1);
493
494         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
496
497         assertEquals(0, allMessages.get(0).getEntries().size());
498         assertEquals(1, allMessages.get(1).getEntries().size());
499     }
500
501
502     @Test
503     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
504         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
505
506         MockRaftActorContext actorContext = createActorContext();
507
508         leader = new Leader(actorContext);
509
510         actorContext.setLastApplied(0);
511
512         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
513         long term = actorContext.getTermInformation().getCurrentTerm();
514         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
515                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
516
517         actorContext.getReplicatedLog().append(newEntry);
518
519         final Identifier id = new MockIdentifier("state-id");
520         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
521
522         // State should not change
523         assertTrue(raftBehavior instanceof Leader);
524
525         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
526
527         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
528         // one since lastApplied state is 0.
529         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
530                 leaderActor, ApplyState.class);
531         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
532
533         for(int i = 0; i <= newLogIndex - 1; i++ ) {
534             ApplyState applyState = applyStateList.get(i);
535             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
536             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
537         }
538
539         ApplyState last = applyStateList.get((int) newLogIndex - 1);
540         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
541         assertEquals("getIdentifier", id, last.getIdentifier());
542     }
543
544     @Test
545     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
546         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
547
548         MockRaftActorContext actorContext = createActorContextWithFollower();
549
550         Map<String, String> leadersSnapshot = new HashMap<>();
551         leadersSnapshot.put("1", "A");
552         leadersSnapshot.put("2", "B");
553         leadersSnapshot.put("3", "C");
554
555         //clears leaders log
556         actorContext.getReplicatedLog().removeFrom(0);
557
558         final int commitIndex = 3;
559         final int snapshotIndex = 2;
560         final int newEntryIndex = 4;
561         final int snapshotTerm = 1;
562         final int currentTerm = 2;
563
564         // set the snapshot variables in replicatedlog
565         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567         actorContext.setCommitIndex(commitIndex);
568         //set follower timeout to 2 mins, helps during debugging
569         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
570
571         leader = new Leader(actorContext);
572
573         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
574         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
575
576         // new entry
577         ReplicatedLogImplEntry entry =
578                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579                         new MockRaftActorContext.MockPayload("D"));
580
581         //update follower timestamp
582         leader.markFollowerActive(FOLLOWER_ID);
583
584         ByteString bs = toByteString(leadersSnapshot);
585         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
586                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
587         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
588                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
589         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
590
591         //send first chunk and no InstallSnapshotReply received yet
592         fts.getNextChunk();
593         fts.incrementChunkIndex();
594
595         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
596                 TimeUnit.MILLISECONDS);
597
598         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
599
600         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
601
602         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
603
604         //InstallSnapshotReply received
605         fts.markSendStatus(true);
606
607         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
608
609         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
610
611         assertEquals(commitIndex, is.getLastIncludedIndex());
612     }
613
614     @Test
615     public void testSendAppendEntriesSnapshotScenario() throws Exception {
616         logStart("testSendAppendEntriesSnapshotScenario");
617
618         MockRaftActorContext actorContext = createActorContextWithFollower();
619
620         Map<String, String> leadersSnapshot = new HashMap<>();
621         leadersSnapshot.put("1", "A");
622         leadersSnapshot.put("2", "B");
623         leadersSnapshot.put("3", "C");
624
625         //clears leaders log
626         actorContext.getReplicatedLog().removeFrom(0);
627
628         final int followersLastIndex = 2;
629         final int snapshotIndex = 3;
630         final int newEntryIndex = 4;
631         final int snapshotTerm = 1;
632         final int currentTerm = 2;
633
634         // set the snapshot variables in replicatedlog
635         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
636         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
637         actorContext.setCommitIndex(followersLastIndex);
638
639         leader = new Leader(actorContext);
640
641         // Leader will send an immediate heartbeat - ignore it.
642         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
643
644         // new entry
645         ReplicatedLogImplEntry entry =
646                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
647                         new MockRaftActorContext.MockPayload("D"));
648
649         actorContext.getReplicatedLog().append(entry);
650
651         //update follower timestamp
652         leader.markFollowerActive(FOLLOWER_ID);
653
654         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
655         RaftActorBehavior raftBehavior = leader.handleMessage(
656                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
657
658         assertTrue(raftBehavior instanceof Leader);
659
660         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
661     }
662
663     @Test
664     public void testInitiateInstallSnapshot() throws Exception {
665         logStart("testInitiateInstallSnapshot");
666
667         MockRaftActorContext actorContext = createActorContextWithFollower();
668
669         //clears leaders log
670         actorContext.getReplicatedLog().removeFrom(0);
671
672         final int followersLastIndex = 2;
673         final int snapshotIndex = 3;
674         final int newEntryIndex = 4;
675         final int snapshotTerm = 1;
676         final int currentTerm = 2;
677
678         // set the snapshot variables in replicatedlog
679         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681         actorContext.setLastApplied(3);
682         actorContext.setCommitIndex(followersLastIndex);
683
684         leader = new Leader(actorContext);
685
686         // Leader will send an immediate heartbeat - ignore it.
687         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
688
689         // set the snapshot as absent and check if capture-snapshot is invoked.
690         leader.setSnapshot(null);
691
692         // new entry
693         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
694                 new MockRaftActorContext.MockPayload("D"));
695
696         actorContext.getReplicatedLog().append(entry);
697
698         //update follower timestamp
699         leader.markFollowerActive(FOLLOWER_ID);
700
701         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
702
703         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
704
705         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
706
707         assertTrue(cs.isInstallSnapshotInitiated());
708         assertEquals(3, cs.getLastAppliedIndex());
709         assertEquals(1, cs.getLastAppliedTerm());
710         assertEquals(4, cs.getLastIndex());
711         assertEquals(2, cs.getLastTerm());
712
713         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
714         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
715
716         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
717     }
718
719     @Test
720     public void testInitiateForceInstallSnapshot() throws Exception {
721         logStart("testInitiateForceInstallSnapshot");
722
723         MockRaftActorContext actorContext = createActorContextWithFollower();
724
725         final int followersLastIndex = 2;
726         final int snapshotIndex = -1;
727         final int newEntryIndex = 4;
728         final int snapshotTerm = -1;
729         final int currentTerm = 2;
730
731         // set the snapshot variables in replicatedlog
732         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734         actorContext.setLastApplied(3);
735         actorContext.setCommitIndex(followersLastIndex);
736
737         actorContext.getReplicatedLog().removeFrom(0);
738
739         leader = new Leader(actorContext);
740
741         // Leader will send an immediate heartbeat - ignore it.
742         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
743
744         // set the snapshot as absent and check if capture-snapshot is invoked.
745         leader.setSnapshot(null);
746
747         for(int i=0;i<4;i++) {
748             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
749                     new MockRaftActorContext.MockPayload("X" + i)));
750         }
751
752         // new entry
753         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
754                 new MockRaftActorContext.MockPayload("D"));
755
756         actorContext.getReplicatedLog().append(entry);
757
758         //update follower timestamp
759         leader.markFollowerActive(FOLLOWER_ID);
760
761         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
762         // installed with a SendInstallSnapshot
763         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
764
765         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
766
767         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
768
769         assertTrue(cs.isInstallSnapshotInitiated());
770         assertEquals(3, cs.getLastAppliedIndex());
771         assertEquals(1, cs.getLastAppliedTerm());
772         assertEquals(4, cs.getLastIndex());
773         assertEquals(2, cs.getLastTerm());
774
775         // if an initiate is started again when first is in progress, it should not initiate Capture
776         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
777
778         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
779     }
780
781
782     @Test
783     public void testInstallSnapshot() throws Exception {
784         logStart("testInstallSnapshot");
785
786         MockRaftActorContext actorContext = createActorContextWithFollower();
787
788         Map<String, String> leadersSnapshot = new HashMap<>();
789         leadersSnapshot.put("1", "A");
790         leadersSnapshot.put("2", "B");
791         leadersSnapshot.put("3", "C");
792
793         //clears leaders log
794         actorContext.getReplicatedLog().removeFrom(0);
795
796         final int lastAppliedIndex = 3;
797         final int snapshotIndex = 2;
798         final int snapshotTerm = 1;
799         final int currentTerm = 2;
800
801         // set the snapshot variables in replicatedlog
802         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805         actorContext.setCommitIndex(lastAppliedIndex);
806         actorContext.setLastApplied(lastAppliedIndex);
807
808         leader = new Leader(actorContext);
809
810         // Initial heartbeat.
811         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812
813         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
814         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
815
816         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
817                 Collections.<ReplicatedLogEntry>emptyList(),
818                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
819
820         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
821
822         assertTrue(raftBehavior instanceof Leader);
823
824         // check if installsnapshot gets called with the correct values.
825
826         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
827
828         assertNotNull(installSnapshot.getData());
829         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
830         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
831
832         assertEquals(currentTerm, installSnapshot.getTerm());
833     }
834
835     @Test
836     public void testForceInstallSnapshot() throws Exception {
837         logStart("testForceInstallSnapshot");
838
839         MockRaftActorContext actorContext = createActorContextWithFollower();
840
841         Map<String, String> leadersSnapshot = new HashMap<>();
842         leadersSnapshot.put("1", "A");
843         leadersSnapshot.put("2", "B");
844         leadersSnapshot.put("3", "C");
845
846         final int lastAppliedIndex = 3;
847         final int snapshotIndex = -1;
848         final int snapshotTerm = -1;
849         final int currentTerm = 2;
850
851         // set the snapshot variables in replicatedlog
852         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
853         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
854         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
855         actorContext.setCommitIndex(lastAppliedIndex);
856         actorContext.setLastApplied(lastAppliedIndex);
857
858         leader = new Leader(actorContext);
859
860         // Initial heartbeat.
861         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
862
863         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
864         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
865
866         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
867                 Collections.<ReplicatedLogEntry>emptyList(),
868                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
869
870         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
871
872         assertTrue(raftBehavior instanceof Leader);
873
874         // check if installsnapshot gets called with the correct values.
875
876         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
877
878         assertNotNull(installSnapshot.getData());
879         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
880         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
881
882         assertEquals(currentTerm, installSnapshot.getTerm());
883     }
884
885     @Test
886     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
887         logStart("testHandleInstallSnapshotReplyLastChunk");
888
889         MockRaftActorContext actorContext = createActorContextWithFollower();
890
891         final int commitIndex = 3;
892         final int snapshotIndex = 2;
893         final int snapshotTerm = 1;
894         final int currentTerm = 2;
895
896         actorContext.setCommitIndex(commitIndex);
897
898         leader = new Leader(actorContext);
899         actorContext.setCurrentBehavior(leader);
900
901         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
902         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
903
904         // Ignore initial heartbeat.
905         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
906
907         Map<String, String> leadersSnapshot = new HashMap<>();
908         leadersSnapshot.put("1", "A");
909         leadersSnapshot.put("2", "B");
910         leadersSnapshot.put("3", "C");
911
912         // set the snapshot variables in replicatedlog
913
914         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
915         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
916         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917
918         ByteString bs = toByteString(leadersSnapshot);
919         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
920                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
921         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
922                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
923         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
924         while(!fts.isLastChunk(fts.getChunkIndex())) {
925             fts.getNextChunk();
926             fts.incrementChunkIndex();
927         }
928
929         //clears leaders log
930         actorContext.getReplicatedLog().removeFrom(0);
931
932         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
933                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
934
935         assertTrue(raftBehavior instanceof Leader);
936
937         assertEquals(0, leader.followerSnapshotSize());
938         assertEquals(1, leader.followerLogSize());
939         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
940         assertNotNull(fli);
941         assertEquals(commitIndex, fli.getMatchIndex());
942         assertEquals(commitIndex + 1, fli.getNextIndex());
943     }
944
945     @Test
946     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
947         logStart("testSendSnapshotfromInstallSnapshotReply");
948
949         MockRaftActorContext actorContext = createActorContextWithFollower();
950
951         final int commitIndex = 3;
952         final int snapshotIndex = 2;
953         final int snapshotTerm = 1;
954         final int currentTerm = 2;
955
956         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
957             @Override
958             public int getSnapshotChunkSize() {
959                 return 50;
960             }
961         };
962         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
963         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
964
965         actorContext.setConfigParams(configParams);
966         actorContext.setCommitIndex(commitIndex);
967
968         leader = new Leader(actorContext);
969         actorContext.setCurrentBehavior(leader);
970
971         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
972         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
973
974         Map<String, String> leadersSnapshot = new HashMap<>();
975         leadersSnapshot.put("1", "A");
976         leadersSnapshot.put("2", "B");
977         leadersSnapshot.put("3", "C");
978
979         // set the snapshot variables in replicatedlog
980         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
981         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
982         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
983
984         ByteString bs = toByteString(leadersSnapshot);
985         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
986                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
987         leader.setSnapshot(snapshot);
988
989         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
990
991         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
992
993         assertEquals(1, installSnapshot.getChunkIndex());
994         assertEquals(3, installSnapshot.getTotalChunks());
995
996         followerActor.underlyingActor().clear();
997         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
998                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
999
1000         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1001
1002         assertEquals(2, installSnapshot.getChunkIndex());
1003         assertEquals(3, installSnapshot.getTotalChunks());
1004
1005         followerActor.underlyingActor().clear();
1006         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1007                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1008
1009         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1010
1011         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1012         followerActor.underlyingActor().clear();
1013         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1014                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1015
1016         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1017
1018         assertNull(installSnapshot);
1019     }
1020
1021
1022     @Test
1023     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1024         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1025
1026         MockRaftActorContext actorContext = createActorContextWithFollower();
1027
1028         final int commitIndex = 3;
1029         final int snapshotIndex = 2;
1030         final int snapshotTerm = 1;
1031         final int currentTerm = 2;
1032
1033         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1034             @Override
1035             public int getSnapshotChunkSize() {
1036                 return 50;
1037             }
1038         });
1039
1040         actorContext.setCommitIndex(commitIndex);
1041
1042         leader = new Leader(actorContext);
1043
1044         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1045         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1046
1047         Map<String, String> leadersSnapshot = new HashMap<>();
1048         leadersSnapshot.put("1", "A");
1049         leadersSnapshot.put("2", "B");
1050         leadersSnapshot.put("3", "C");
1051
1052         // set the snapshot variables in replicatedlog
1053         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1054         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1055         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1056
1057         ByteString bs = toByteString(leadersSnapshot);
1058         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1059                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1060         leader.setSnapshot(snapshot);
1061
1062         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1063         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1064
1065         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1066
1067         assertEquals(1, installSnapshot.getChunkIndex());
1068         assertEquals(3, installSnapshot.getTotalChunks());
1069
1070         followerActor.underlyingActor().clear();
1071
1072         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1073                 FOLLOWER_ID, -1, false));
1074
1075         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076                 TimeUnit.MILLISECONDS);
1077
1078         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1079
1080         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1081
1082         assertEquals(1, installSnapshot.getChunkIndex());
1083         assertEquals(3, installSnapshot.getTotalChunks());
1084     }
1085
1086     @Test
1087     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1088         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1089
1090         MockRaftActorContext actorContext = createActorContextWithFollower();
1091
1092         final int commitIndex = 3;
1093         final int snapshotIndex = 2;
1094         final int snapshotTerm = 1;
1095         final int currentTerm = 2;
1096
1097         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1098             @Override
1099             public int getSnapshotChunkSize() {
1100                 return 50;
1101             }
1102         });
1103
1104         actorContext.setCommitIndex(commitIndex);
1105
1106         leader = new Leader(actorContext);
1107
1108         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1109         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1110
1111         Map<String, String> leadersSnapshot = new HashMap<>();
1112         leadersSnapshot.put("1", "A");
1113         leadersSnapshot.put("2", "B");
1114         leadersSnapshot.put("3", "C");
1115
1116         // set the snapshot variables in replicatedlog
1117         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1118         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1119         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1120
1121         ByteString bs = toByteString(leadersSnapshot);
1122         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1123                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1124         leader.setSnapshot(snapshot);
1125
1126         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1127
1128         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1129
1130         assertEquals(1, installSnapshot.getChunkIndex());
1131         assertEquals(3, installSnapshot.getTotalChunks());
1132         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1133                 installSnapshot.getLastChunkHashCode().get().intValue());
1134
1135         int hashCode = Arrays.hashCode(installSnapshot.getData());
1136
1137         followerActor.underlyingActor().clear();
1138
1139         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1140                 FOLLOWER_ID, 1, true));
1141
1142         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1143
1144         assertEquals(2, installSnapshot.getChunkIndex());
1145         assertEquals(3, installSnapshot.getTotalChunks());
1146         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1147     }
1148
1149     @Test
1150     public void testFollowerToSnapshotLogic() {
1151         logStart("testFollowerToSnapshotLogic");
1152
1153         MockRaftActorContext actorContext = createActorContext();
1154
1155         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1156             @Override
1157             public int getSnapshotChunkSize() {
1158                 return 50;
1159             }
1160         });
1161
1162         leader = new Leader(actorContext);
1163
1164         Map<String, String> leadersSnapshot = new HashMap<>();
1165         leadersSnapshot.put("1", "A");
1166         leadersSnapshot.put("2", "B");
1167         leadersSnapshot.put("3", "C");
1168
1169         ByteString bs = toByteString(leadersSnapshot);
1170         byte[] barray = bs.toByteArray();
1171
1172         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
1173                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
1174         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1175
1176         assertEquals(bs.size(), barray.length);
1177
1178         int chunkIndex=0;
1179         for (int i=0; i < barray.length; i = i + 50) {
1180             int j = i + 50;
1181             chunkIndex++;
1182
1183             if (i + 50 > barray.length) {
1184                 j = barray.length;
1185             }
1186
1187             byte[] chunk = fts.getNextChunk();
1188             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1189             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1190
1191             fts.markSendStatus(true);
1192             if (!fts.isLastChunk(chunkIndex)) {
1193                 fts.incrementChunkIndex();
1194             }
1195         }
1196
1197         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1198     }
1199
1200     @Override
1201     protected Leader createBehavior(final RaftActorContext actorContext) {
1202         return new Leader(actorContext);
1203     }
1204
1205     @Override
1206     protected MockRaftActorContext createActorContext() {
1207         return createActorContext(leaderActor);
1208     }
1209
1210     @Override
1211     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1212         return createActorContext(LEADER_ID, actorRef);
1213     }
1214
1215     private MockRaftActorContext createActorContextWithFollower() {
1216         MockRaftActorContext actorContext = createActorContext();
1217         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1218                 followerActor.path().toString()).build());
1219         return actorContext;
1220     }
1221
1222     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1223         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1224         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1225         configParams.setElectionTimeoutFactor(100000);
1226         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1227         context.setConfigParams(configParams);
1228         context.setPayloadVersion(payloadVersion);
1229         return context;
1230     }
1231
1232     private MockRaftActorContext createFollowerActorContextWithLeader() {
1233         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1234         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1235         followerConfig.setElectionTimeoutFactor(10000);
1236         followerActorContext.setConfigParams(followerConfig);
1237         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1238         return followerActorContext;
1239     }
1240
1241     @Test
1242     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1243         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1244
1245         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1246
1247         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1248
1249         Follower follower = new Follower(followerActorContext);
1250         followerActor.underlyingActor().setBehavior(follower);
1251         followerActorContext.setCurrentBehavior(follower);
1252
1253         Map<String, String> peerAddresses = new HashMap<>();
1254         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1255
1256         leaderActorContext.setPeerAddresses(peerAddresses);
1257
1258         leaderActorContext.getReplicatedLog().removeFrom(0);
1259
1260         //create 3 entries
1261         leaderActorContext.setReplicatedLog(
1262                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1263
1264         leaderActorContext.setCommitIndex(1);
1265
1266         followerActorContext.getReplicatedLog().removeFrom(0);
1267
1268         // follower too has the exact same log entries and has the same commit index
1269         followerActorContext.setReplicatedLog(
1270                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1271
1272         followerActorContext.setCommitIndex(1);
1273
1274         leader = new Leader(leaderActorContext);
1275         leaderActorContext.setCurrentBehavior(leader);
1276
1277         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1278
1279         assertEquals(1, appendEntries.getLeaderCommit());
1280         assertEquals(0, appendEntries.getEntries().size());
1281         assertEquals(0, appendEntries.getPrevLogIndex());
1282
1283         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1284                 leaderActor, AppendEntriesReply.class);
1285
1286         assertEquals(2, appendEntriesReply.getLogLastIndex());
1287         assertEquals(1, appendEntriesReply.getLogLastTerm());
1288
1289         // follower returns its next index
1290         assertEquals(2, appendEntriesReply.getLogLastIndex());
1291         assertEquals(1, appendEntriesReply.getLogLastTerm());
1292
1293         follower.close();
1294     }
1295
1296     @Test
1297     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1298         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1299
1300         MockRaftActorContext leaderActorContext = createActorContext();
1301
1302         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1303         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1304
1305         Follower follower = new Follower(followerActorContext);
1306         followerActor.underlyingActor().setBehavior(follower);
1307         followerActorContext.setCurrentBehavior(follower);
1308
1309         Map<String, String> leaderPeerAddresses = new HashMap<>();
1310         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1311
1312         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1313
1314         leaderActorContext.getReplicatedLog().removeFrom(0);
1315
1316         leaderActorContext.setReplicatedLog(
1317                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1318
1319         leaderActorContext.setCommitIndex(1);
1320
1321         followerActorContext.getReplicatedLog().removeFrom(0);
1322
1323         followerActorContext.setReplicatedLog(
1324                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1325
1326         // follower has the same log entries but its commit index > leaders commit index
1327         followerActorContext.setCommitIndex(2);
1328
1329         leader = new Leader(leaderActorContext);
1330
1331         // Initial heartbeat
1332         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1333
1334         assertEquals(1, appendEntries.getLeaderCommit());
1335         assertEquals(0, appendEntries.getEntries().size());
1336         assertEquals(0, appendEntries.getPrevLogIndex());
1337
1338         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1339                 leaderActor, AppendEntriesReply.class);
1340
1341         assertEquals(2, appendEntriesReply.getLogLastIndex());
1342         assertEquals(1, appendEntriesReply.getLogLastTerm());
1343
1344         leaderActor.underlyingActor().setBehavior(follower);
1345         leader.handleMessage(followerActor, appendEntriesReply);
1346
1347         leaderActor.underlyingActor().clear();
1348         followerActor.underlyingActor().clear();
1349
1350         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1351                 TimeUnit.MILLISECONDS);
1352
1353         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1354
1355         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1356
1357         assertEquals(2, appendEntries.getLeaderCommit());
1358         assertEquals(0, appendEntries.getEntries().size());
1359         assertEquals(2, appendEntries.getPrevLogIndex());
1360
1361         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1362
1363         assertEquals(2, appendEntriesReply.getLogLastIndex());
1364         assertEquals(1, appendEntriesReply.getLogLastTerm());
1365
1366         assertEquals(2, followerActorContext.getCommitIndex());
1367
1368         follower.close();
1369     }
1370
1371     @Test
1372     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1373         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1374
1375         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1376         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1377                 new FiniteDuration(1000, TimeUnit.SECONDS));
1378
1379         leaderActorContext.setReplicatedLog(
1380                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1381         long leaderCommitIndex = 2;
1382         leaderActorContext.setCommitIndex(leaderCommitIndex);
1383         leaderActorContext.setLastApplied(leaderCommitIndex);
1384
1385         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1386         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1387
1388         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1389
1390         followerActorContext.setReplicatedLog(
1391                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1392         followerActorContext.setCommitIndex(0);
1393         followerActorContext.setLastApplied(0);
1394
1395         Follower follower = new Follower(followerActorContext);
1396         followerActor.underlyingActor().setBehavior(follower);
1397
1398         leader = new Leader(leaderActorContext);
1399
1400         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1401         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1402
1403         MessageCollectorActor.clearMessages(followerActor);
1404         MessageCollectorActor.clearMessages(leaderActor);
1405
1406         // Verify initial AppendEntries sent with the leader's current commit index.
1407         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1409         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1410
1411         leaderActor.underlyingActor().setBehavior(leader);
1412
1413         leader.handleMessage(followerActor, appendEntriesReply);
1414
1415         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1416         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1417
1418         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1420         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1421
1422         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1423         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1424                 appendEntries.getEntries().get(0).getData());
1425         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1426         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1427                 appendEntries.getEntries().get(1).getData());
1428
1429         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1430         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1431
1432         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1433
1434         ApplyState applyState = applyStateList.get(0);
1435         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1436         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1438                 applyState.getReplicatedLogEntry().getData());
1439
1440         applyState = applyStateList.get(1);
1441         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1442         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1444                 applyState.getReplicatedLogEntry().getData());
1445
1446         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1447         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1448     }
1449
1450     @Test
1451     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1452         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1453
1454         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1455         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1456                 new FiniteDuration(1000, TimeUnit.SECONDS));
1457
1458         leaderActorContext.setReplicatedLog(
1459                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1460         long leaderCommitIndex = 1;
1461         leaderActorContext.setCommitIndex(leaderCommitIndex);
1462         leaderActorContext.setLastApplied(leaderCommitIndex);
1463
1464         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1465         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1466
1467         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1468
1469         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1470         followerActorContext.setCommitIndex(-1);
1471         followerActorContext.setLastApplied(-1);
1472
1473         Follower follower = new Follower(followerActorContext);
1474         followerActor.underlyingActor().setBehavior(follower);
1475         followerActorContext.setCurrentBehavior(follower);
1476
1477         leader = new Leader(leaderActorContext);
1478
1479         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1480         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1481
1482         MessageCollectorActor.clearMessages(followerActor);
1483         MessageCollectorActor.clearMessages(leaderActor);
1484
1485         // Verify initial AppendEntries sent with the leader's current commit index.
1486         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1487         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1488         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1489
1490         leaderActor.underlyingActor().setBehavior(leader);
1491         leaderActorContext.setCurrentBehavior(leader);
1492
1493         leader.handleMessage(followerActor, appendEntriesReply);
1494
1495         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1496         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1497
1498         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1499         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1500         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1501
1502         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1503         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1504                 appendEntries.getEntries().get(0).getData());
1505         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1506         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1507                 appendEntries.getEntries().get(1).getData());
1508
1509         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1510         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1511
1512         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1513
1514         ApplyState applyState = applyStateList.get(0);
1515         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1516         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1518                 applyState.getReplicatedLogEntry().getData());
1519
1520         applyState = applyStateList.get(1);
1521         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1522         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1523         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1524                 applyState.getReplicatedLogEntry().getData());
1525
1526         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1527         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1528     }
1529
1530     @Test
1531     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1532         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1533
1534         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1535         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1536                 new FiniteDuration(1000, TimeUnit.SECONDS));
1537
1538         leaderActorContext.setReplicatedLog(
1539                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1540         long leaderCommitIndex = 1;
1541         leaderActorContext.setCommitIndex(leaderCommitIndex);
1542         leaderActorContext.setLastApplied(leaderCommitIndex);
1543
1544         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1545         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1546
1547         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1548
1549         followerActorContext.setReplicatedLog(
1550                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1551         followerActorContext.setCommitIndex(-1);
1552         followerActorContext.setLastApplied(-1);
1553
1554         Follower follower = new Follower(followerActorContext);
1555         followerActor.underlyingActor().setBehavior(follower);
1556         followerActorContext.setCurrentBehavior(follower);
1557
1558         leader = new Leader(leaderActorContext);
1559
1560         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1561         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1562
1563         MessageCollectorActor.clearMessages(followerActor);
1564         MessageCollectorActor.clearMessages(leaderActor);
1565
1566         // Verify initial AppendEntries sent with the leader's current commit index.
1567         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1568         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1569         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1570
1571         leaderActor.underlyingActor().setBehavior(leader);
1572         leaderActorContext.setCurrentBehavior(leader);
1573
1574         leader.handleMessage(followerActor, appendEntriesReply);
1575
1576         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1577         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1578
1579         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1580         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1581         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1582
1583         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1584         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1585         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1586                 appendEntries.getEntries().get(0).getData());
1587         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1588         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1589         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1590                 appendEntries.getEntries().get(1).getData());
1591
1592         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1593         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1594
1595         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1596
1597         ApplyState applyState = applyStateList.get(0);
1598         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1599         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1601                 applyState.getReplicatedLogEntry().getData());
1602
1603         applyState = applyStateList.get(1);
1604         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1605         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1606         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1607                 applyState.getReplicatedLogEntry().getData());
1608
1609         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1610         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1611         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1612     }
1613
1614     @Test
1615     public void testHandleAppendEntriesReplyWithNewerTerm(){
1616         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1617
1618         MockRaftActorContext leaderActorContext = createActorContext();
1619         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1620                 new FiniteDuration(10000, TimeUnit.SECONDS));
1621
1622         leaderActorContext.setReplicatedLog(
1623                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1624
1625         leader = new Leader(leaderActorContext);
1626         leaderActor.underlyingActor().setBehavior(leader);
1627         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1628
1629         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1630
1631         assertEquals(false, appendEntriesReply.isSuccess());
1632         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1633
1634         MessageCollectorActor.clearMessages(leaderActor);
1635     }
1636
1637     @Test
1638     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1639         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1640
1641         MockRaftActorContext leaderActorContext = createActorContext();
1642         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1643                 new FiniteDuration(10000, TimeUnit.SECONDS));
1644
1645         leaderActorContext.setReplicatedLog(
1646                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1647         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1648
1649         leader = new Leader(leaderActorContext);
1650         leaderActor.underlyingActor().setBehavior(leader);
1651         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1652
1653         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1654
1655         assertEquals(false, appendEntriesReply.isSuccess());
1656         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1657
1658         MessageCollectorActor.clearMessages(leaderActor);
1659     }
1660
1661     @Test
1662     public void testHandleAppendEntriesReplySuccess() throws Exception {
1663         logStart("testHandleAppendEntriesReplySuccess");
1664
1665         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1666
1667         leaderActorContext.setReplicatedLog(
1668                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1669
1670         leaderActorContext.setCommitIndex(1);
1671         leaderActorContext.setLastApplied(1);
1672         leaderActorContext.getTermInformation().update(1, "leader");
1673
1674         leader = new Leader(leaderActorContext);
1675
1676         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1677
1678         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1679         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1680
1681         short payloadVersion = 5;
1682         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1683
1684         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1685
1686         assertEquals(RaftState.Leader, raftActorBehavior.state());
1687
1688         assertEquals(2, leaderActorContext.getCommitIndex());
1689
1690         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1691                 leaderActor, ApplyJournalEntries.class);
1692
1693         assertEquals(2, leaderActorContext.getLastApplied());
1694
1695         assertEquals(2, applyJournalEntries.getToIndex());
1696
1697         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1698                 ApplyState.class);
1699
1700         assertEquals(1,applyStateList.size());
1701
1702         ApplyState applyState = applyStateList.get(0);
1703
1704         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1705
1706         assertEquals(2, followerInfo.getMatchIndex());
1707         assertEquals(3, followerInfo.getNextIndex());
1708         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1709         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1710     }
1711
1712     @Test
1713     public void testHandleAppendEntriesReplyUnknownFollower(){
1714         logStart("testHandleAppendEntriesReplyUnknownFollower");
1715
1716         MockRaftActorContext leaderActorContext = createActorContext();
1717
1718         leader = new Leader(leaderActorContext);
1719
1720         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1721
1722         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1723
1724         assertEquals(RaftState.Leader, raftActorBehavior.state());
1725     }
1726
1727     @Test
1728     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1729         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1730
1731         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1732         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1733                 new FiniteDuration(1000, TimeUnit.SECONDS));
1734         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1735
1736         leaderActorContext.setReplicatedLog(
1737                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1738         long leaderCommitIndex = 3;
1739         leaderActorContext.setCommitIndex(leaderCommitIndex);
1740         leaderActorContext.setLastApplied(leaderCommitIndex);
1741
1742         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1743         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1744         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1745         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1746
1747         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1748
1749         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1750         followerActorContext.setCommitIndex(-1);
1751         followerActorContext.setLastApplied(-1);
1752
1753         Follower follower = new Follower(followerActorContext);
1754         followerActor.underlyingActor().setBehavior(follower);
1755         followerActorContext.setCurrentBehavior(follower);
1756
1757         leader = new Leader(leaderActorContext);
1758
1759         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1760         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1761
1762         MessageCollectorActor.clearMessages(followerActor);
1763         MessageCollectorActor.clearMessages(leaderActor);
1764
1765         // Verify initial AppendEntries sent with the leader's current commit index.
1766         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1767         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1768         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1769
1770         leaderActor.underlyingActor().setBehavior(leader);
1771         leaderActorContext.setCurrentBehavior(leader);
1772
1773         leader.handleMessage(followerActor, appendEntriesReply);
1774
1775         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1776         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1777
1778         appendEntries = appendEntriesList.get(0);
1779         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1780         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1781         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1782
1783         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1784         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1785                 appendEntries.getEntries().get(0).getData());
1786         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1787         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1788                 appendEntries.getEntries().get(1).getData());
1789
1790         appendEntries = appendEntriesList.get(1);
1791         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1792         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1793         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1794
1795         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1796         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1797                 appendEntries.getEntries().get(0).getData());
1798         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1799         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1800                 appendEntries.getEntries().get(1).getData());
1801
1802         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1803         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1804
1805         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1806
1807         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1808         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1809     }
1810
1811     @Test
1812     public void testHandleRequestVoteReply(){
1813         logStart("testHandleRequestVoteReply");
1814
1815         MockRaftActorContext leaderActorContext = createActorContext();
1816
1817         leader = new Leader(leaderActorContext);
1818
1819         // Should be a no-op.
1820         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1821                 new RequestVoteReply(1, true));
1822
1823         assertEquals(RaftState.Leader, raftActorBehavior.state());
1824
1825         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1826
1827         assertEquals(RaftState.Leader, raftActorBehavior.state());
1828     }
1829
1830     @Test
1831     public void testIsolatedLeaderCheckNoFollowers() {
1832         logStart("testIsolatedLeaderCheckNoFollowers");
1833
1834         MockRaftActorContext leaderActorContext = createActorContext();
1835
1836         leader = new Leader(leaderActorContext);
1837         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1838         assertTrue(behavior instanceof Leader);
1839     }
1840
1841     @Test
1842     public void testIsolatedLeaderCheckNoVotingFollowers() {
1843         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1844
1845         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1846         Follower follower = new Follower(followerActorContext);
1847         followerActor.underlyingActor().setBehavior(follower);
1848
1849         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1850         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1851                 new FiniteDuration(1000, TimeUnit.SECONDS));
1852         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1853
1854         leader = new Leader(leaderActorContext);
1855         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1856         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1857         assertTrue("Expected Leader", behavior instanceof Leader);
1858     }
1859
1860     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1861         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1862         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1863
1864         MockRaftActorContext leaderActorContext = createActorContext();
1865
1866         Map<String, String> peerAddresses = new HashMap<>();
1867         peerAddresses.put("follower-1", followerActor1.path().toString());
1868         peerAddresses.put("follower-2", followerActor2.path().toString());
1869
1870         leaderActorContext.setPeerAddresses(peerAddresses);
1871         leaderActorContext.setRaftPolicy(raftPolicy);
1872
1873         leader = new Leader(leaderActorContext);
1874
1875         leader.markFollowerActive("follower-1");
1876         leader.markFollowerActive("follower-2");
1877         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1878         assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1879
1880         // kill 1 follower and verify if that got killed
1881         final JavaTestKit probe = new JavaTestKit(getSystem());
1882         probe.watch(followerActor1);
1883         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1884         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1885         assertEquals(termMsg1.getActor(), followerActor1);
1886
1887         leader.markFollowerInActive("follower-1");
1888         leader.markFollowerActive("follower-2");
1889         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1890         assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1891
1892         // kill 2nd follower and leader should change to Isolated leader
1893         followerActor2.tell(PoisonPill.getInstance(), null);
1894         probe.watch(followerActor2);
1895         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1896         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1897         assertEquals(termMsg2.getActor(), followerActor2);
1898
1899         leader.markFollowerInActive("follower-2");
1900         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1901     }
1902
1903     @Test
1904     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1905         logStart("testIsolatedLeaderCheckTwoFollowers");
1906
1907         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1908
1909         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1910             behavior instanceof IsolatedLeader);
1911     }
1912
1913     @Test
1914     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1915         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1916
1917         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1918
1919         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1920                 behavior instanceof Leader);
1921     }
1922
1923     @Test
1924     public void testLaggingFollowerStarvation() throws Exception {
1925         logStart("testLaggingFollowerStarvation");
1926         new JavaTestKit(getSystem()) {{
1927             String leaderActorId = actorFactory.generateActorId("leader");
1928             String follower1ActorId = actorFactory.generateActorId("follower");
1929             String follower2ActorId = actorFactory.generateActorId("follower");
1930
1931             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1932                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1933             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1934             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1935
1936             MockRaftActorContext leaderActorContext =
1937                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1938
1939             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1940             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1941             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1942
1943             leaderActorContext.setConfigParams(configParams);
1944
1945             leaderActorContext.setReplicatedLog(
1946                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1947
1948             Map<String, String> peerAddresses = new HashMap<>();
1949             peerAddresses.put(follower1ActorId,
1950                     follower1Actor.path().toString());
1951             peerAddresses.put(follower2ActorId,
1952                     follower2Actor.path().toString());
1953
1954             leaderActorContext.setPeerAddresses(peerAddresses);
1955             leaderActorContext.getTermInformation().update(1, leaderActorId);
1956
1957             RaftActorBehavior leader = createBehavior(leaderActorContext);
1958
1959             leaderActor.underlyingActor().setBehavior(leader);
1960
1961             for(int i=1;i<6;i++) {
1962                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1963                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1964                 assertTrue(newBehavior == leader);
1965                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1966             }
1967
1968             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1969             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1970
1971             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1972                     heartbeats.size() > 1);
1973
1974             // Check if follower-2 got AppendEntries during this time and was not starved
1975             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1976
1977             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1978                     appendEntries.size() > 1);
1979
1980         }};
1981     }
1982
1983     @Test
1984     public void testReplicationConsensusWithNonVotingFollower() {
1985         logStart("testReplicationConsensusWithNonVotingFollower");
1986
1987         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1988         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1989                 new FiniteDuration(1000, TimeUnit.SECONDS));
1990
1991         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1992         leaderActorContext.setCommitIndex(-1);
1993
1994         String nonVotingFollowerId = "nonvoting-follower";
1995         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1996                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1997
1998         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1999
2000         leader = new Leader(leaderActorContext);
2001         leaderActorContext.setCurrentBehavior(leader);
2002
2003         // Ignore initial heartbeats
2004         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2005         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2006
2007         MessageCollectorActor.clearMessages(followerActor);
2008         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2009         MessageCollectorActor.clearMessages(leaderActor);
2010
2011         // Send a Replicate message and wait for AppendEntries.
2012         sendReplicate(leaderActorContext, 0);
2013
2014         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2015         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2016
2017         // Send reply only from the voting follower and verify consensus via ApplyState.
2018         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2019
2020         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2021
2022         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2023
2024         MessageCollectorActor.clearMessages(followerActor);
2025         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2026         MessageCollectorActor.clearMessages(leaderActor);
2027
2028         // Send another Replicate message
2029         sendReplicate(leaderActorContext, 1);
2030
2031         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2032         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2033                 AppendEntries.class);
2034         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2035         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2036
2037         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2038         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2039
2040         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2041
2042         // Send reply from the voting follower and verify consensus.
2043         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2044
2045         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2046     }
2047
2048     @Test
2049     public void testTransferLeadershipWithFollowerInSync() {
2050         logStart("testTransferLeadershipWithFollowerInSync");
2051
2052         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2053         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2054                 new FiniteDuration(1000, TimeUnit.SECONDS));
2055         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2056
2057         leader = new Leader(leaderActorContext);
2058         leaderActorContext.setCurrentBehavior(leader);
2059
2060         // Initial heartbeat
2061         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2062         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2063         MessageCollectorActor.clearMessages(followerActor);
2064
2065         sendReplicate(leaderActorContext, 0);
2066         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2067
2068         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2069         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2070         MessageCollectorActor.clearMessages(followerActor);
2071
2072         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2073         leader.transferLeadership(mockTransferCohort);
2074
2075         verify(mockTransferCohort, never()).transferComplete();
2076         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2077         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2078
2079         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2080         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2081
2082         // Leader should force an election timeout
2083         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2084
2085         verify(mockTransferCohort).transferComplete();
2086     }
2087
2088     @Test
2089     public void testTransferLeadershipWithEmptyLog() {
2090         logStart("testTransferLeadershipWithEmptyLog");
2091
2092         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2093         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2094                 new FiniteDuration(1000, TimeUnit.SECONDS));
2095         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2096
2097         leader = new Leader(leaderActorContext);
2098         leaderActorContext.setCurrentBehavior(leader);
2099
2100         // Initial heartbeat
2101         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2102         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2103         MessageCollectorActor.clearMessages(followerActor);
2104
2105         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2106         leader.transferLeadership(mockTransferCohort);
2107
2108         verify(mockTransferCohort, never()).transferComplete();
2109         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2111
2112         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2113         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2114
2115         // Leader should force an election timeout
2116         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2117
2118         verify(mockTransferCohort).transferComplete();
2119     }
2120
2121     @Test
2122     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2123         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2124
2125         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2128
2129         leader = new Leader(leaderActorContext);
2130         leaderActorContext.setCurrentBehavior(leader);
2131
2132         // Initial heartbeat
2133         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134         MessageCollectorActor.clearMessages(followerActor);
2135
2136         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2137         leader.transferLeadership(mockTransferCohort);
2138
2139         verify(mockTransferCohort, never()).transferComplete();
2140
2141         // Sync up the follower.
2142         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2143         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2144         MessageCollectorActor.clearMessages(followerActor);
2145
2146         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2147                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2148         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2149         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2150         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2151
2152         // Leader should force an election timeout
2153         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2154
2155         verify(mockTransferCohort).transferComplete();
2156     }
2157
2158     @Test
2159     public void testTransferLeadershipWithFollowerSyncTimeout() {
2160         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2161
2162         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2163         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2164                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2165         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2166         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2167
2168         leader = new Leader(leaderActorContext);
2169         leaderActorContext.setCurrentBehavior(leader);
2170
2171         // Initial heartbeat
2172         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2173         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2174         MessageCollectorActor.clearMessages(followerActor);
2175
2176         sendReplicate(leaderActorContext, 0);
2177         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178
2179         MessageCollectorActor.clearMessages(followerActor);
2180
2181         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2182         leader.transferLeadership(mockTransferCohort);
2183
2184         verify(mockTransferCohort, never()).transferComplete();
2185
2186         // Send heartbeats to time out the transfer.
2187         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2188             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2189                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2190             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2191         }
2192
2193         verify(mockTransferCohort).abortTransfer();
2194         verify(mockTransferCohort, never()).transferComplete();
2195         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2196     }
2197
2198     @Override
2199     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2200             ActorRef actorRef, RaftRPC rpc) throws Exception {
2201         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2202         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2203     }
2204
2205     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2206
2207         private final long electionTimeOutIntervalMillis;
2208         private final int snapshotChunkSize;
2209
2210         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2211             super();
2212             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2213             this.snapshotChunkSize = snapshotChunkSize;
2214         }
2215
2216         @Override
2217         public FiniteDuration getElectionTimeOutInterval() {
2218             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2219         }
2220
2221         @Override
2222         public int getSnapshotChunkSize() {
2223             return snapshotChunkSize;
2224         }
2225     }
2226 }