479ba8fbc5858c938eef4cd9075840835b16a039
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
40 import org.opendaylight.controller.cluster.raft.RaftState;
41 import org.opendaylight.controller.cluster.raft.RaftVersions;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
44 import org.opendaylight.controller.cluster.raft.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class LeaderTest extends AbstractLeaderTest {
69
70     static final String FOLLOWER_ID = "follower";
71     public static final String LEADER_ID = "leader";
72
73     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
74             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
75
76     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
78
79     private Leader leader;
80     private final short payloadVersion = 5;
81
82     @Override
83     @After
84     public void tearDown() throws Exception {
85         if(leader != null) {
86             leader.close();
87         }
88
89         super.tearDown();
90     }
91
92     @Test
93     public void testHandleMessageForUnknownMessage() throws Exception {
94         logStart("testHandleMessageForUnknownMessage");
95
96         leader = new Leader(createActorContext());
97
98         // handle message should return the Leader state when it receives an
99         // unknown message
100         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
101         Assert.assertTrue(behavior instanceof Leader);
102     }
103
104     @Test
105     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107
108         MockRaftActorContext actorContext = createActorContextWithFollower();
109         short payloadVersion = (short)5;
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116
117         // Leader should send an immediate heartbeat with no entries as follower is inactive.
118         long lastIndex = actorContext.getReplicatedLog().lastIndex();
119         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
120         assertEquals("getTerm", term, appendEntries.getTerm());
121         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
122         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
123         assertEquals("Entries size", 0, appendEntries.getEntries().size());
124         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
125
126         // The follower would normally reply - simulate that explicitly here.
127         leader.handleMessage(followerActor, new AppendEntriesReply(
128                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
129         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
130
131         followerActor.underlyingActor().clear();
132
133         // Sleep for the heartbeat interval so AppendEntries is sent.
134         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
135                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
136
137         leader.handleMessage(leaderActor, new SendHeartBeat());
138
139         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
140         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
141         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
142         assertEquals("Entries size", 1, appendEntries.getEntries().size());
143         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
144         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
145         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
146     }
147
148
149     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
150         return sendReplicate(actorContext, 1, index);
151     }
152
153     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
154         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156                 term, index, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
159     }
160
161     @Test
162     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
163         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
164
165         MockRaftActorContext actorContext = createActorContextWithFollower();
166
167         long term = 1;
168         actorContext.getTermInformation().update(term, "");
169
170         leader = new Leader(actorContext);
171
172         // Leader will send an immediate heartbeat - ignore it.
173         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
174
175         // The follower would normally reply - simulate that explicitly here.
176         long lastIndex = actorContext.getReplicatedLog().lastIndex();
177         leader.handleMessage(followerActor, new AppendEntriesReply(
178                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
179         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
180
181         followerActor.underlyingActor().clear();
182
183         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
184
185         // State should not change
186         assertTrue(raftBehavior instanceof Leader);
187
188         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
190         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
191         assertEquals("Entries size", 1, appendEntries.getEntries().size());
192         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
193         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
194         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
195         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
196     }
197
198     @Test
199     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
200         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
201
202         MockRaftActorContext actorContext = createActorContextWithFollower();
203
204         // The raft context is initialized with a couple log entries. However the commitIndex
205         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
206         // committed and applied. Now it regains leadership with a higher term (2).
207         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
208         long newTerm = prevTerm + 1;
209         actorContext.getTermInformation().update(newTerm, "");
210
211         leader = new Leader(actorContext);
212
213         // Leader will send an immediate heartbeat - ignore it.
214         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215
216         // The follower replies with the leader's current last index and term, simulating that it is
217         // up to date with the leader.
218         long lastIndex = actorContext.getReplicatedLog().lastIndex();
219         leader.handleMessage(followerActor, new AppendEntriesReply(
220                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
221
222         // The commit index should not get updated even though consensus was reached. This is b/c the
223         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
224         // from previous terms by counting replicas".
225         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
226
227         followerActor.underlyingActor().clear();
228
229         // Now replicate a new entry with the new term 2.
230         long newIndex = lastIndex + 1;
231         sendReplicate(actorContext, newTerm, newIndex);
232
233         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
235         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
236         assertEquals("Entries size", 1, appendEntries.getEntries().size());
237         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
238         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
239         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
240
241         // The follower replies with success. The leader should now update the commit index to the new index
242         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
243         // prior entries are committed indirectly".
244         leader.handleMessage(followerActor, new AppendEntriesReply(
245                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
246
247         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
248     }
249
250     @Test
251     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
252         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
253
254         MockRaftActorContext actorContext = createActorContextWithFollower();
255         actorContext.setRaftPolicy(createRaftPolicy(true, true));
256
257         long term = 1;
258         actorContext.getTermInformation().update(term, "");
259
260         leader = new Leader(actorContext);
261
262         // Leader will send an immediate heartbeat - ignore it.
263         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
264
265         // The follower would normally reply - simulate that explicitly here.
266         long lastIndex = actorContext.getReplicatedLog().lastIndex();
267         leader.handleMessage(followerActor, new AppendEntriesReply(
268                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
269         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
270
271         followerActor.underlyingActor().clear();
272
273         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
274
275         // State should not change
276         assertTrue(raftBehavior instanceof Leader);
277
278         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
279         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
280         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
281         assertEquals("Entries size", 1, appendEntries.getEntries().size());
282         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
283         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
284         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
285         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
286     }
287
288     @Test
289     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
290         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
291
292         MockRaftActorContext actorContext = createActorContextWithFollower();
293         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
294             @Override
295             public FiniteDuration getHeartBeatInterval() {
296                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
297             }
298         });
299
300         long term = 1;
301         actorContext.getTermInformation().update(term, "");
302
303         leader = new Leader(actorContext);
304
305         // Leader will send an immediate heartbeat - ignore it.
306         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
307
308         // The follower would normally reply - simulate that explicitly here.
309         long lastIndex = actorContext.getReplicatedLog().lastIndex();
310         leader.handleMessage(followerActor, new AppendEntriesReply(
311                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
312         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
313
314         followerActor.underlyingActor().clear();
315
316         for(int i=0;i<5;i++) {
317             sendReplicate(actorContext, lastIndex+i+1);
318         }
319
320         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
321         // We expect only 1 message to be sent because of two reasons,
322         // - an append entries reply was not received
323         // - the heartbeat interval has not expired
324         // In this scenario if multiple messages are sent they would likely be duplicates
325         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
326     }
327
328     @Test
329     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
330         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
331
332         MockRaftActorContext actorContext = createActorContextWithFollower();
333         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
334             @Override
335             public FiniteDuration getHeartBeatInterval() {
336                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
337             }
338         });
339
340         long term = 1;
341         actorContext.getTermInformation().update(term, "");
342
343         leader = new Leader(actorContext);
344
345         // Leader will send an immediate heartbeat - ignore it.
346         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
347
348         // The follower would normally reply - simulate that explicitly here.
349         long lastIndex = actorContext.getReplicatedLog().lastIndex();
350         leader.handleMessage(followerActor, new AppendEntriesReply(
351                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
352         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
353
354         followerActor.underlyingActor().clear();
355
356         for(int i=0;i<3;i++) {
357             sendReplicate(actorContext, lastIndex+i+1);
358             leader.handleMessage(followerActor, new AppendEntriesReply(
359                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
360
361         }
362
363         for(int i=3;i<5;i++) {
364             sendReplicate(actorContext, lastIndex + i + 1);
365         }
366
367         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
368         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
369         // get sent to the follower - but not the 5th
370         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
371
372         for(int i=0;i<4;i++) {
373             long expected = allMessages.get(i).getEntries().get(0).getIndex();
374             assertEquals(expected, i+2);
375         }
376     }
377
378     @Test
379     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
380         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
381
382         MockRaftActorContext actorContext = createActorContextWithFollower();
383         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
384             @Override
385             public FiniteDuration getHeartBeatInterval() {
386                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
387             }
388         });
389
390         long term = 1;
391         actorContext.getTermInformation().update(term, "");
392
393         leader = new Leader(actorContext);
394
395         // Leader will send an immediate heartbeat - ignore it.
396         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
397
398         // The follower would normally reply - simulate that explicitly here.
399         long lastIndex = actorContext.getReplicatedLog().lastIndex();
400         leader.handleMessage(followerActor, new AppendEntriesReply(
401                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
402         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
403
404         followerActor.underlyingActor().clear();
405
406         sendReplicate(actorContext, lastIndex+1);
407
408         // Wait slightly longer than heartbeat duration
409         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
410
411         leader.handleMessage(leaderActor, new SendHeartBeat());
412
413         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
414         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
415
416         assertEquals(1, allMessages.get(0).getEntries().size());
417         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
418         assertEquals(1, allMessages.get(1).getEntries().size());
419         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
420
421     }
422
423     @Test
424     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
425         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
426
427         MockRaftActorContext actorContext = createActorContextWithFollower();
428         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
429             @Override
430             public FiniteDuration getHeartBeatInterval() {
431                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
432             }
433         });
434
435         long term = 1;
436         actorContext.getTermInformation().update(term, "");
437
438         leader = new Leader(actorContext);
439
440         // Leader will send an immediate heartbeat - ignore it.
441         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
442
443         // The follower would normally reply - simulate that explicitly here.
444         long lastIndex = actorContext.getReplicatedLog().lastIndex();
445         leader.handleMessage(followerActor, new AppendEntriesReply(
446                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
447         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
448
449         followerActor.underlyingActor().clear();
450
451         for(int i=0;i<3;i++) {
452             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
453             leader.handleMessage(leaderActor, new SendHeartBeat());
454         }
455
456         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
457         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
458     }
459
460     @Test
461     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
462         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
463
464         MockRaftActorContext actorContext = createActorContextWithFollower();
465         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
466             @Override
467             public FiniteDuration getHeartBeatInterval() {
468                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
469             }
470         });
471
472         long term = 1;
473         actorContext.getTermInformation().update(term, "");
474
475         leader = new Leader(actorContext);
476
477         // Leader will send an immediate heartbeat - ignore it.
478         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
479
480         // The follower would normally reply - simulate that explicitly here.
481         long lastIndex = actorContext.getReplicatedLog().lastIndex();
482         leader.handleMessage(followerActor, new AppendEntriesReply(
483                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
484         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
485
486         followerActor.underlyingActor().clear();
487
488         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
489         leader.handleMessage(leaderActor, new SendHeartBeat());
490         sendReplicate(actorContext, lastIndex+1);
491
492         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
493         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
494
495         assertEquals(0, allMessages.get(0).getEntries().size());
496         assertEquals(1, allMessages.get(1).getEntries().size());
497     }
498
499
500     @Test
501     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
502         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
503
504         MockRaftActorContext actorContext = createActorContext();
505
506         leader = new Leader(actorContext);
507
508         actorContext.setLastApplied(0);
509
510         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
511         long term = actorContext.getTermInformation().getCurrentTerm();
512         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
513                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
514
515         actorContext.getReplicatedLog().append(newEntry);
516
517         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
518                 new Replicate(leaderActor, "state-id", newEntry));
519
520         // State should not change
521         assertTrue(raftBehavior instanceof Leader);
522
523         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
524
525         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
526         // one since lastApplied state is 0.
527         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
528                 leaderActor, ApplyState.class);
529         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
530
531         for(int i = 0; i <= newLogIndex - 1; i++ ) {
532             ApplyState applyState = applyStateList.get(i);
533             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
534             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
535         }
536
537         ApplyState last = applyStateList.get((int) newLogIndex - 1);
538         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
539         assertEquals("getIdentifier", "state-id", last.getIdentifier());
540     }
541
542     @Test
543     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
544         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
545
546         MockRaftActorContext actorContext = createActorContextWithFollower();
547
548         Map<String, String> leadersSnapshot = new HashMap<>();
549         leadersSnapshot.put("1", "A");
550         leadersSnapshot.put("2", "B");
551         leadersSnapshot.put("3", "C");
552
553         //clears leaders log
554         actorContext.getReplicatedLog().removeFrom(0);
555
556         final int commitIndex = 3;
557         final int snapshotIndex = 2;
558         final int newEntryIndex = 4;
559         final int snapshotTerm = 1;
560         final int currentTerm = 2;
561
562         // set the snapshot variables in replicatedlog
563         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
564         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
565         actorContext.setCommitIndex(commitIndex);
566         //set follower timeout to 2 mins, helps during debugging
567         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
568
569         leader = new Leader(actorContext);
570
571         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
572         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
573
574         // new entry
575         ReplicatedLogImplEntry entry =
576                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
577                         new MockRaftActorContext.MockPayload("D"));
578
579         //update follower timestamp
580         leader.markFollowerActive(FOLLOWER_ID);
581
582         ByteString bs = toByteString(leadersSnapshot);
583         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
584                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
585         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
586         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
587
588         //send first chunk and no InstallSnapshotReply received yet
589         fts.getNextChunk();
590         fts.incrementChunkIndex();
591
592         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
593                 TimeUnit.MILLISECONDS);
594
595         leader.handleMessage(leaderActor, new SendHeartBeat());
596
597         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
598
599         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
600
601         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
602
603         //InstallSnapshotReply received
604         fts.markSendStatus(true);
605
606         leader.handleMessage(leaderActor, new SendHeartBeat());
607
608         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
609
610         assertEquals(commitIndex, is.getLastIncludedIndex());
611     }
612
613     @Test
614     public void testSendAppendEntriesSnapshotScenario() throws Exception {
615         logStart("testSendAppendEntriesSnapshotScenario");
616
617         MockRaftActorContext actorContext = createActorContextWithFollower();
618
619         Map<String, String> leadersSnapshot = new HashMap<>();
620         leadersSnapshot.put("1", "A");
621         leadersSnapshot.put("2", "B");
622         leadersSnapshot.put("3", "C");
623
624         //clears leaders log
625         actorContext.getReplicatedLog().removeFrom(0);
626
627         final int followersLastIndex = 2;
628         final int snapshotIndex = 3;
629         final int newEntryIndex = 4;
630         final int snapshotTerm = 1;
631         final int currentTerm = 2;
632
633         // set the snapshot variables in replicatedlog
634         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
635         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
636         actorContext.setCommitIndex(followersLastIndex);
637
638         leader = new Leader(actorContext);
639
640         // Leader will send an immediate heartbeat - ignore it.
641         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
642
643         // new entry
644         ReplicatedLogImplEntry entry =
645                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
646                         new MockRaftActorContext.MockPayload("D"));
647
648         actorContext.getReplicatedLog().append(entry);
649
650         //update follower timestamp
651         leader.markFollowerActive(FOLLOWER_ID);
652
653         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
654         RaftActorBehavior raftBehavior = leader.handleMessage(
655                 leaderActor, new Replicate(null, "state-id", entry));
656
657         assertTrue(raftBehavior instanceof Leader);
658
659         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
660     }
661
662     @Test
663     public void testInitiateInstallSnapshot() throws Exception {
664         logStart("testInitiateInstallSnapshot");
665
666         MockRaftActorContext actorContext = createActorContextWithFollower();
667
668         //clears leaders log
669         actorContext.getReplicatedLog().removeFrom(0);
670
671         final int followersLastIndex = 2;
672         final int snapshotIndex = 3;
673         final int newEntryIndex = 4;
674         final int snapshotTerm = 1;
675         final int currentTerm = 2;
676
677         // set the snapshot variables in replicatedlog
678         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
679         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
680         actorContext.setLastApplied(3);
681         actorContext.setCommitIndex(followersLastIndex);
682
683         leader = new Leader(actorContext);
684
685         // Leader will send an immediate heartbeat - ignore it.
686         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
687
688         // set the snapshot as absent and check if capture-snapshot is invoked.
689         leader.setSnapshot(null);
690
691         // new entry
692         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
693                 new MockRaftActorContext.MockPayload("D"));
694
695         actorContext.getReplicatedLog().append(entry);
696
697         //update follower timestamp
698         leader.markFollowerActive(FOLLOWER_ID);
699
700         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
701
702         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
703
704         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
705
706         assertTrue(cs.isInstallSnapshotInitiated());
707         assertEquals(3, cs.getLastAppliedIndex());
708         assertEquals(1, cs.getLastAppliedTerm());
709         assertEquals(4, cs.getLastIndex());
710         assertEquals(2, cs.getLastTerm());
711
712         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
713         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
714
715         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
716     }
717
718     @Test
719     public void testInitiateForceInstallSnapshot() throws Exception {
720         logStart("testInitiateForceInstallSnapshot");
721
722         MockRaftActorContext actorContext = createActorContextWithFollower();
723
724         final int followersLastIndex = 2;
725         final int snapshotIndex = -1;
726         final int newEntryIndex = 4;
727         final int snapshotTerm = -1;
728         final int currentTerm = 2;
729
730         // set the snapshot variables in replicatedlog
731         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733         actorContext.setLastApplied(3);
734         actorContext.setCommitIndex(followersLastIndex);
735
736         actorContext.getReplicatedLog().removeFrom(0);
737
738         leader = new Leader(actorContext);
739
740         // Leader will send an immediate heartbeat - ignore it.
741         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
742
743         // set the snapshot as absent and check if capture-snapshot is invoked.
744         leader.setSnapshot(null);
745
746         for(int i=0;i<4;i++) {
747             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
748                     new MockRaftActorContext.MockPayload("X" + i)));
749         }
750
751         // new entry
752         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
753                 new MockRaftActorContext.MockPayload("D"));
754
755         actorContext.getReplicatedLog().append(entry);
756
757         //update follower timestamp
758         leader.markFollowerActive(FOLLOWER_ID);
759
760         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
761         // installed with a SendInstallSnapshot
762         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
763
764         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
765
766         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
767
768         assertTrue(cs.isInstallSnapshotInitiated());
769         assertEquals(3, cs.getLastAppliedIndex());
770         assertEquals(1, cs.getLastAppliedTerm());
771         assertEquals(4, cs.getLastIndex());
772         assertEquals(2, cs.getLastTerm());
773
774         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
775         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
776
777         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
778     }
779
780
781     @Test
782     public void testInstallSnapshot() throws Exception {
783         logStart("testInstallSnapshot");
784
785         MockRaftActorContext actorContext = createActorContextWithFollower();
786
787         Map<String, String> leadersSnapshot = new HashMap<>();
788         leadersSnapshot.put("1", "A");
789         leadersSnapshot.put("2", "B");
790         leadersSnapshot.put("3", "C");
791
792         //clears leaders log
793         actorContext.getReplicatedLog().removeFrom(0);
794
795         final int lastAppliedIndex = 3;
796         final int snapshotIndex = 2;
797         final int snapshotTerm = 1;
798         final int currentTerm = 2;
799
800         // set the snapshot variables in replicatedlog
801         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804         actorContext.setCommitIndex(lastAppliedIndex);
805         actorContext.setLastApplied(lastAppliedIndex);
806
807         leader = new Leader(actorContext);
808
809         // Initial heartbeat.
810         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
811
812         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
813         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
814
815         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
816                 Collections.<ReplicatedLogEntry>emptyList(),
817                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
818
819         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
820
821         assertTrue(raftBehavior instanceof Leader);
822
823         // check if installsnapshot gets called with the correct values.
824
825         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
826
827         assertNotNull(installSnapshot.getData());
828         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
829         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
830
831         assertEquals(currentTerm, installSnapshot.getTerm());
832     }
833
834     @Test
835     public void testForceInstallSnapshot() throws Exception {
836         logStart("testForceInstallSnapshot");
837
838         MockRaftActorContext actorContext = createActorContextWithFollower();
839
840         Map<String, String> leadersSnapshot = new HashMap<>();
841         leadersSnapshot.put("1", "A");
842         leadersSnapshot.put("2", "B");
843         leadersSnapshot.put("3", "C");
844
845         final int lastAppliedIndex = 3;
846         final int snapshotIndex = -1;
847         final int snapshotTerm = -1;
848         final int currentTerm = 2;
849
850         // set the snapshot variables in replicatedlog
851         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
852         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
853         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
854         actorContext.setCommitIndex(lastAppliedIndex);
855         actorContext.setLastApplied(lastAppliedIndex);
856
857         leader = new Leader(actorContext);
858
859         // Initial heartbeat.
860         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
861
862         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
863         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
864
865         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
866                 Collections.<ReplicatedLogEntry>emptyList(),
867                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
868
869         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
870
871         assertTrue(raftBehavior instanceof Leader);
872
873         // check if installsnapshot gets called with the correct values.
874
875         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
876
877         assertNotNull(installSnapshot.getData());
878         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
879         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
880
881         assertEquals(currentTerm, installSnapshot.getTerm());
882     }
883
884     @Test
885     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
886         logStart("testHandleInstallSnapshotReplyLastChunk");
887
888         MockRaftActorContext actorContext = createActorContextWithFollower();
889
890         final int commitIndex = 3;
891         final int snapshotIndex = 2;
892         final int snapshotTerm = 1;
893         final int currentTerm = 2;
894
895         actorContext.setCommitIndex(commitIndex);
896
897         leader = new Leader(actorContext);
898
899         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
900         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
901
902         // Ignore initial heartbeat.
903         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
904
905         Map<String, String> leadersSnapshot = new HashMap<>();
906         leadersSnapshot.put("1", "A");
907         leadersSnapshot.put("2", "B");
908         leadersSnapshot.put("3", "C");
909
910         // set the snapshot variables in replicatedlog
911
912         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
913         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
914         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
915
916         ByteString bs = toByteString(leadersSnapshot);
917         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
918                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
919         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
920         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
921         while(!fts.isLastChunk(fts.getChunkIndex())) {
922             fts.getNextChunk();
923             fts.incrementChunkIndex();
924         }
925
926         //clears leaders log
927         actorContext.getReplicatedLog().removeFrom(0);
928
929         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
930                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
931
932         assertTrue(raftBehavior instanceof Leader);
933
934         assertEquals(0, leader.followerSnapshotSize());
935         assertEquals(1, leader.followerLogSize());
936         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
937         assertNotNull(fli);
938         assertEquals(commitIndex, fli.getMatchIndex());
939         assertEquals(commitIndex + 1, fli.getNextIndex());
940     }
941
942     @Test
943     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
944         logStart("testSendSnapshotfromInstallSnapshotReply");
945
946         MockRaftActorContext actorContext = createActorContextWithFollower();
947
948         final int commitIndex = 3;
949         final int snapshotIndex = 2;
950         final int snapshotTerm = 1;
951         final int currentTerm = 2;
952
953         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
954             @Override
955             public int getSnapshotChunkSize() {
956                 return 50;
957             }
958         };
959         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
960         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
961
962         actorContext.setConfigParams(configParams);
963         actorContext.setCommitIndex(commitIndex);
964
965         leader = new Leader(actorContext);
966
967         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
968         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
969
970         Map<String, String> leadersSnapshot = new HashMap<>();
971         leadersSnapshot.put("1", "A");
972         leadersSnapshot.put("2", "B");
973         leadersSnapshot.put("3", "C");
974
975         // set the snapshot variables in replicatedlog
976         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
977         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
978         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
979
980         ByteString bs = toByteString(leadersSnapshot);
981         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
982                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
983         leader.setSnapshot(snapshot);
984
985         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
986
987         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
988
989         assertEquals(1, installSnapshot.getChunkIndex());
990         assertEquals(3, installSnapshot.getTotalChunks());
991
992         followerActor.underlyingActor().clear();
993         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
994                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
995
996         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
997
998         assertEquals(2, installSnapshot.getChunkIndex());
999         assertEquals(3, installSnapshot.getTotalChunks());
1000
1001         followerActor.underlyingActor().clear();
1002         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1003                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1004
1005         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1006
1007         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1008         followerActor.underlyingActor().clear();
1009         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1010                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1011
1012         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1013
1014         Assert.assertNull(installSnapshot);
1015     }
1016
1017
1018     @Test
1019     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1020         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1021
1022         MockRaftActorContext actorContext = createActorContextWithFollower();
1023
1024         final int commitIndex = 3;
1025         final int snapshotIndex = 2;
1026         final int snapshotTerm = 1;
1027         final int currentTerm = 2;
1028
1029         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1030             @Override
1031             public int getSnapshotChunkSize() {
1032                 return 50;
1033             }
1034         });
1035
1036         actorContext.setCommitIndex(commitIndex);
1037
1038         leader = new Leader(actorContext);
1039
1040         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1041         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1042
1043         Map<String, String> leadersSnapshot = new HashMap<>();
1044         leadersSnapshot.put("1", "A");
1045         leadersSnapshot.put("2", "B");
1046         leadersSnapshot.put("3", "C");
1047
1048         // set the snapshot variables in replicatedlog
1049         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1050         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1051         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1052
1053         ByteString bs = toByteString(leadersSnapshot);
1054         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1055                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1056         leader.setSnapshot(snapshot);
1057
1058         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1059         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1060
1061         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1062
1063         assertEquals(1, installSnapshot.getChunkIndex());
1064         assertEquals(3, installSnapshot.getTotalChunks());
1065
1066         followerActor.underlyingActor().clear();
1067
1068         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1069                 FOLLOWER_ID, -1, false));
1070
1071         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1072                 TimeUnit.MILLISECONDS);
1073
1074         leader.handleMessage(leaderActor, new SendHeartBeat());
1075
1076         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1077
1078         assertEquals(1, installSnapshot.getChunkIndex());
1079         assertEquals(3, installSnapshot.getTotalChunks());
1080     }
1081
1082     @Test
1083     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1084         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1085
1086         MockRaftActorContext actorContext = createActorContextWithFollower();
1087
1088         final int commitIndex = 3;
1089         final int snapshotIndex = 2;
1090         final int snapshotTerm = 1;
1091         final int currentTerm = 2;
1092
1093         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1094             @Override
1095             public int getSnapshotChunkSize() {
1096                 return 50;
1097             }
1098         });
1099
1100         actorContext.setCommitIndex(commitIndex);
1101
1102         leader = new Leader(actorContext);
1103
1104         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1105         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1106
1107         Map<String, String> leadersSnapshot = new HashMap<>();
1108         leadersSnapshot.put("1", "A");
1109         leadersSnapshot.put("2", "B");
1110         leadersSnapshot.put("3", "C");
1111
1112         // set the snapshot variables in replicatedlog
1113         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1114         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1115         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1116
1117         ByteString bs = toByteString(leadersSnapshot);
1118         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1119                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1120         leader.setSnapshot(snapshot);
1121
1122         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1123
1124         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1125
1126         assertEquals(1, installSnapshot.getChunkIndex());
1127         assertEquals(3, installSnapshot.getTotalChunks());
1128         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1129
1130         int hashCode = Arrays.hashCode(installSnapshot.getData());
1131
1132         followerActor.underlyingActor().clear();
1133
1134         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1135                 FOLLOWER_ID, 1, true));
1136
1137         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1138
1139         assertEquals(2, installSnapshot.getChunkIndex());
1140         assertEquals(3, installSnapshot.getTotalChunks());
1141         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1142     }
1143
1144     @Test
1145     public void testFollowerToSnapshotLogic() {
1146         logStart("testFollowerToSnapshotLogic");
1147
1148         MockRaftActorContext actorContext = createActorContext();
1149
1150         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1151             @Override
1152             public int getSnapshotChunkSize() {
1153                 return 50;
1154             }
1155         });
1156
1157         leader = new Leader(actorContext);
1158
1159         Map<String, String> leadersSnapshot = new HashMap<>();
1160         leadersSnapshot.put("1", "A");
1161         leadersSnapshot.put("2", "B");
1162         leadersSnapshot.put("3", "C");
1163
1164         ByteString bs = toByteString(leadersSnapshot);
1165         byte[] barray = bs.toByteArray();
1166
1167         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1168         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1169
1170         assertEquals(bs.size(), barray.length);
1171
1172         int chunkIndex=0;
1173         for (int i=0; i < barray.length; i = i + 50) {
1174             int j = i + 50;
1175             chunkIndex++;
1176
1177             if (i + 50 > barray.length) {
1178                 j = barray.length;
1179             }
1180
1181             byte[] chunk = fts.getNextChunk();
1182             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1183             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1184
1185             fts.markSendStatus(true);
1186             if (!fts.isLastChunk(chunkIndex)) {
1187                 fts.incrementChunkIndex();
1188             }
1189         }
1190
1191         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1192     }
1193
1194     @Override protected RaftActorBehavior createBehavior(
1195         RaftActorContext actorContext) {
1196         return new Leader(actorContext);
1197     }
1198
1199     @Override
1200     protected MockRaftActorContext createActorContext() {
1201         return createActorContext(leaderActor);
1202     }
1203
1204     @Override
1205     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1206         return createActorContext(LEADER_ID, actorRef);
1207     }
1208
1209     private MockRaftActorContext createActorContextWithFollower() {
1210         MockRaftActorContext actorContext = createActorContext();
1211         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1212                 followerActor.path().toString()).build());
1213         return actorContext;
1214     }
1215
1216     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1217         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1218         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1219         configParams.setElectionTimeoutFactor(100000);
1220         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1221         context.setConfigParams(configParams);
1222         context.setPayloadVersion(payloadVersion);
1223         return context;
1224     }
1225
1226     private MockRaftActorContext createFollowerActorContextWithLeader() {
1227         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1228         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1229         followerConfig.setElectionTimeoutFactor(10000);
1230         followerActorContext.setConfigParams(followerConfig);
1231         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1232         return followerActorContext;
1233     }
1234
1235     @Test
1236     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1237         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1238
1239         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1240
1241         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1242
1243         Follower follower = new Follower(followerActorContext);
1244         followerActor.underlyingActor().setBehavior(follower);
1245
1246         Map<String, String> peerAddresses = new HashMap<>();
1247         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1248
1249         leaderActorContext.setPeerAddresses(peerAddresses);
1250
1251         leaderActorContext.getReplicatedLog().removeFrom(0);
1252
1253         //create 3 entries
1254         leaderActorContext.setReplicatedLog(
1255                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1256
1257         leaderActorContext.setCommitIndex(1);
1258
1259         followerActorContext.getReplicatedLog().removeFrom(0);
1260
1261         // follower too has the exact same log entries and has the same commit index
1262         followerActorContext.setReplicatedLog(
1263                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1264
1265         followerActorContext.setCommitIndex(1);
1266
1267         leader = new Leader(leaderActorContext);
1268
1269         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1270
1271         assertEquals(1, appendEntries.getLeaderCommit());
1272         assertEquals(0, appendEntries.getEntries().size());
1273         assertEquals(0, appendEntries.getPrevLogIndex());
1274
1275         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1276                 leaderActor, AppendEntriesReply.class);
1277
1278         assertEquals(2, appendEntriesReply.getLogLastIndex());
1279         assertEquals(1, appendEntriesReply.getLogLastTerm());
1280
1281         // follower returns its next index
1282         assertEquals(2, appendEntriesReply.getLogLastIndex());
1283         assertEquals(1, appendEntriesReply.getLogLastTerm());
1284
1285         follower.close();
1286     }
1287
1288     @Test
1289     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1290         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1291
1292         MockRaftActorContext leaderActorContext = createActorContext();
1293
1294         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1295         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296
1297         Follower follower = new Follower(followerActorContext);
1298         followerActor.underlyingActor().setBehavior(follower);
1299
1300         Map<String, String> leaderPeerAddresses = new HashMap<>();
1301         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1302
1303         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1304
1305         leaderActorContext.getReplicatedLog().removeFrom(0);
1306
1307         leaderActorContext.setReplicatedLog(
1308                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309
1310         leaderActorContext.setCommitIndex(1);
1311
1312         followerActorContext.getReplicatedLog().removeFrom(0);
1313
1314         followerActorContext.setReplicatedLog(
1315                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1316
1317         // follower has the same log entries but its commit index > leaders commit index
1318         followerActorContext.setCommitIndex(2);
1319
1320         leader = new Leader(leaderActorContext);
1321
1322         // Initial heartbeat
1323         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1324
1325         assertEquals(1, appendEntries.getLeaderCommit());
1326         assertEquals(0, appendEntries.getEntries().size());
1327         assertEquals(0, appendEntries.getPrevLogIndex());
1328
1329         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1330                 leaderActor, AppendEntriesReply.class);
1331
1332         assertEquals(2, appendEntriesReply.getLogLastIndex());
1333         assertEquals(1, appendEntriesReply.getLogLastTerm());
1334
1335         leaderActor.underlyingActor().setBehavior(follower);
1336         leader.handleMessage(followerActor, appendEntriesReply);
1337
1338         leaderActor.underlyingActor().clear();
1339         followerActor.underlyingActor().clear();
1340
1341         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1342                 TimeUnit.MILLISECONDS);
1343
1344         leader.handleMessage(leaderActor, new SendHeartBeat());
1345
1346         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1347
1348         assertEquals(2, appendEntries.getLeaderCommit());
1349         assertEquals(0, appendEntries.getEntries().size());
1350         assertEquals(2, appendEntries.getPrevLogIndex());
1351
1352         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1353
1354         assertEquals(2, appendEntriesReply.getLogLastIndex());
1355         assertEquals(1, appendEntriesReply.getLogLastTerm());
1356
1357         assertEquals(2, followerActorContext.getCommitIndex());
1358
1359         follower.close();
1360     }
1361
1362     @Test
1363     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1364         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1365
1366         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1367         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1368                 new FiniteDuration(1000, TimeUnit.SECONDS));
1369
1370         leaderActorContext.setReplicatedLog(
1371                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1372         long leaderCommitIndex = 2;
1373         leaderActorContext.setCommitIndex(leaderCommitIndex);
1374         leaderActorContext.setLastApplied(leaderCommitIndex);
1375
1376         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1377         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1378
1379         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1380
1381         followerActorContext.setReplicatedLog(
1382                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1383         followerActorContext.setCommitIndex(0);
1384         followerActorContext.setLastApplied(0);
1385
1386         Follower follower = new Follower(followerActorContext);
1387         followerActor.underlyingActor().setBehavior(follower);
1388
1389         leader = new Leader(leaderActorContext);
1390
1391         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1393
1394         MessageCollectorActor.clearMessages(followerActor);
1395         MessageCollectorActor.clearMessages(leaderActor);
1396
1397         // Verify initial AppendEntries sent with the leader's current commit index.
1398         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1399         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1400         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1401
1402         leaderActor.underlyingActor().setBehavior(leader);
1403
1404         leader.handleMessage(followerActor, appendEntriesReply);
1405
1406         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1407         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1408
1409         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1410         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1411         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1412
1413         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1414         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1415                 appendEntries.getEntries().get(0).getData());
1416         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1417         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1418                 appendEntries.getEntries().get(1).getData());
1419
1420         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1421         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1422
1423         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1424
1425         ApplyState applyState = applyStateList.get(0);
1426         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1427         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1428         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1429                 applyState.getReplicatedLogEntry().getData());
1430
1431         applyState = applyStateList.get(1);
1432         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1433         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1434         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1435                 applyState.getReplicatedLogEntry().getData());
1436
1437         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1438         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1439     }
1440
1441     @Test
1442     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1443         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1444
1445         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1446         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1447                 new FiniteDuration(1000, TimeUnit.SECONDS));
1448
1449         leaderActorContext.setReplicatedLog(
1450                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1451         long leaderCommitIndex = 1;
1452         leaderActorContext.setCommitIndex(leaderCommitIndex);
1453         leaderActorContext.setLastApplied(leaderCommitIndex);
1454
1455         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1456         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1457
1458         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1459
1460         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1461         followerActorContext.setCommitIndex(-1);
1462         followerActorContext.setLastApplied(-1);
1463
1464         Follower follower = new Follower(followerActorContext);
1465         followerActor.underlyingActor().setBehavior(follower);
1466
1467         leader = new Leader(leaderActorContext);
1468
1469         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1470         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1471
1472         MessageCollectorActor.clearMessages(followerActor);
1473         MessageCollectorActor.clearMessages(leaderActor);
1474
1475         // Verify initial AppendEntries sent with the leader's current commit index.
1476         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1477         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1478         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479
1480         leaderActor.underlyingActor().setBehavior(leader);
1481
1482         leader.handleMessage(followerActor, appendEntriesReply);
1483
1484         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1485         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1486
1487         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1488         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1489         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1490
1491         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1492         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1493                 appendEntries.getEntries().get(0).getData());
1494         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1495         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1496                 appendEntries.getEntries().get(1).getData());
1497
1498         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1499         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1500
1501         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1502
1503         ApplyState applyState = applyStateList.get(0);
1504         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1505         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1506         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1507                 applyState.getReplicatedLogEntry().getData());
1508
1509         applyState = applyStateList.get(1);
1510         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1511         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1512         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1513                 applyState.getReplicatedLogEntry().getData());
1514
1515         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1516         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1517     }
1518
1519     @Test
1520     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1521         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1522
1523         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1524         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1525                 new FiniteDuration(1000, TimeUnit.SECONDS));
1526
1527         leaderActorContext.setReplicatedLog(
1528                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1529         long leaderCommitIndex = 1;
1530         leaderActorContext.setCommitIndex(leaderCommitIndex);
1531         leaderActorContext.setLastApplied(leaderCommitIndex);
1532
1533         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1534         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1535
1536         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1537
1538         followerActorContext.setReplicatedLog(
1539                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1540         followerActorContext.setCommitIndex(-1);
1541         followerActorContext.setLastApplied(-1);
1542
1543         Follower follower = new Follower(followerActorContext);
1544         followerActor.underlyingActor().setBehavior(follower);
1545
1546         leader = new Leader(leaderActorContext);
1547
1548         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1549         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1550
1551         MessageCollectorActor.clearMessages(followerActor);
1552         MessageCollectorActor.clearMessages(leaderActor);
1553
1554         // Verify initial AppendEntries sent with the leader's current commit index.
1555         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1556         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1557         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1558
1559         leaderActor.underlyingActor().setBehavior(leader);
1560
1561         leader.handleMessage(followerActor, appendEntriesReply);
1562
1563         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1564         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1565
1566         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1567         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1568         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1569
1570         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1571         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1572         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1573                 appendEntries.getEntries().get(0).getData());
1574         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1575         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1576         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1577                 appendEntries.getEntries().get(1).getData());
1578
1579         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1580         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1581
1582         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1583
1584         ApplyState applyState = applyStateList.get(0);
1585         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1586         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1587         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1588                 applyState.getReplicatedLogEntry().getData());
1589
1590         applyState = applyStateList.get(1);
1591         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1592         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1593         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1594                 applyState.getReplicatedLogEntry().getData());
1595
1596         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1597         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1598         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1599     }
1600
1601     @Test
1602     public void testHandleAppendEntriesReplyWithNewerTerm(){
1603         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1604
1605         MockRaftActorContext leaderActorContext = createActorContext();
1606         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1607                 new FiniteDuration(10000, TimeUnit.SECONDS));
1608
1609         leaderActorContext.setReplicatedLog(
1610                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1611
1612         leader = new Leader(leaderActorContext);
1613         leaderActor.underlyingActor().setBehavior(leader);
1614         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1615
1616         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1617
1618         assertEquals(false, appendEntriesReply.isSuccess());
1619         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1620
1621         MessageCollectorActor.clearMessages(leaderActor);
1622     }
1623
1624     @Test
1625     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1626         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1627
1628         MockRaftActorContext leaderActorContext = createActorContext();
1629         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1630                 new FiniteDuration(10000, TimeUnit.SECONDS));
1631
1632         leaderActorContext.setReplicatedLog(
1633                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1634         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1635
1636         leader = new Leader(leaderActorContext);
1637         leaderActor.underlyingActor().setBehavior(leader);
1638         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1639
1640         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1641
1642         assertEquals(false, appendEntriesReply.isSuccess());
1643         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1644
1645         MessageCollectorActor.clearMessages(leaderActor);
1646     }
1647
1648     @Test
1649     public void testHandleAppendEntriesReplySuccess() throws Exception {
1650         logStart("testHandleAppendEntriesReplySuccess");
1651
1652         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1653
1654         leaderActorContext.setReplicatedLog(
1655                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1656
1657         leaderActorContext.setCommitIndex(1);
1658         leaderActorContext.setLastApplied(1);
1659         leaderActorContext.getTermInformation().update(1, "leader");
1660
1661         leader = new Leader(leaderActorContext);
1662
1663         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1664
1665         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1666         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1667
1668         short payloadVersion = 5;
1669         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1670
1671         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1672
1673         assertEquals(RaftState.Leader, raftActorBehavior.state());
1674
1675         assertEquals(2, leaderActorContext.getCommitIndex());
1676
1677         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1678                 leaderActor, ApplyJournalEntries.class);
1679
1680         assertEquals(2, leaderActorContext.getLastApplied());
1681
1682         assertEquals(2, applyJournalEntries.getToIndex());
1683
1684         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1685                 ApplyState.class);
1686
1687         assertEquals(1,applyStateList.size());
1688
1689         ApplyState applyState = applyStateList.get(0);
1690
1691         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1692
1693         assertEquals(2, followerInfo.getMatchIndex());
1694         assertEquals(3, followerInfo.getNextIndex());
1695         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1696         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1697     }
1698
1699     @Test
1700     public void testHandleAppendEntriesReplyUnknownFollower(){
1701         logStart("testHandleAppendEntriesReplyUnknownFollower");
1702
1703         MockRaftActorContext leaderActorContext = createActorContext();
1704
1705         leader = new Leader(leaderActorContext);
1706
1707         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1708
1709         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1710
1711         assertEquals(RaftState.Leader, raftActorBehavior.state());
1712     }
1713
1714     @Test
1715     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1716         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1717
1718         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1719         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1720                 new FiniteDuration(1000, TimeUnit.SECONDS));
1721         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1722
1723         leaderActorContext.setReplicatedLog(
1724                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1725         long leaderCommitIndex = 3;
1726         leaderActorContext.setCommitIndex(leaderCommitIndex);
1727         leaderActorContext.setLastApplied(leaderCommitIndex);
1728
1729         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1730         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1731         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1732         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1733
1734         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1735
1736         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1737         followerActorContext.setCommitIndex(-1);
1738         followerActorContext.setLastApplied(-1);
1739
1740         Follower follower = new Follower(followerActorContext);
1741         followerActor.underlyingActor().setBehavior(follower);
1742
1743         leader = new Leader(leaderActorContext);
1744
1745         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1746         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1747
1748         MessageCollectorActor.clearMessages(followerActor);
1749         MessageCollectorActor.clearMessages(leaderActor);
1750
1751         // Verify initial AppendEntries sent with the leader's current commit index.
1752         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1753         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1754         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1755
1756         leaderActor.underlyingActor().setBehavior(leader);
1757
1758         leader.handleMessage(followerActor, appendEntriesReply);
1759
1760         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1761         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1762
1763         appendEntries = appendEntriesList.get(0);
1764         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1765         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1766         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1767
1768         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1769         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1770                 appendEntries.getEntries().get(0).getData());
1771         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1772         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1773                 appendEntries.getEntries().get(1).getData());
1774
1775         appendEntries = appendEntriesList.get(1);
1776         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1777         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1778         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1779
1780         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1781         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1782                 appendEntries.getEntries().get(0).getData());
1783         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1784         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1785                 appendEntries.getEntries().get(1).getData());
1786
1787         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1788         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1789
1790         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1791
1792         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1793         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1794     }
1795
1796     @Test
1797     public void testHandleRequestVoteReply(){
1798         logStart("testHandleRequestVoteReply");
1799
1800         MockRaftActorContext leaderActorContext = createActorContext();
1801
1802         leader = new Leader(leaderActorContext);
1803
1804         // Should be a no-op.
1805         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1806                 new RequestVoteReply(1, true));
1807
1808         assertEquals(RaftState.Leader, raftActorBehavior.state());
1809
1810         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1811
1812         assertEquals(RaftState.Leader, raftActorBehavior.state());
1813     }
1814
1815     @Test
1816     public void testIsolatedLeaderCheckNoFollowers() {
1817         logStart("testIsolatedLeaderCheckNoFollowers");
1818
1819         MockRaftActorContext leaderActorContext = createActorContext();
1820
1821         leader = new Leader(leaderActorContext);
1822         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1823         Assert.assertTrue(behavior instanceof Leader);
1824     }
1825
1826     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1827         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1828         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1829
1830         MockRaftActorContext leaderActorContext = createActorContext();
1831
1832         Map<String, String> peerAddresses = new HashMap<>();
1833         peerAddresses.put("follower-1", followerActor1.path().toString());
1834         peerAddresses.put("follower-2", followerActor2.path().toString());
1835
1836         leaderActorContext.setPeerAddresses(peerAddresses);
1837         leaderActorContext.setRaftPolicy(raftPolicy);
1838
1839         leader = new Leader(leaderActorContext);
1840
1841         leader.markFollowerActive("follower-1");
1842         leader.markFollowerActive("follower-2");
1843         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1844         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1845                 behavior instanceof Leader);
1846
1847         // kill 1 follower and verify if that got killed
1848         final JavaTestKit probe = new JavaTestKit(getSystem());
1849         probe.watch(followerActor1);
1850         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1851         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1852         assertEquals(termMsg1.getActor(), followerActor1);
1853
1854         leader.markFollowerInActive("follower-1");
1855         leader.markFollowerActive("follower-2");
1856         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1857         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1858                 behavior instanceof Leader);
1859
1860         // kill 2nd follower and leader should change to Isolated leader
1861         followerActor2.tell(PoisonPill.getInstance(), null);
1862         probe.watch(followerActor2);
1863         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1864         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1865         assertEquals(termMsg2.getActor(), followerActor2);
1866
1867         leader.markFollowerInActive("follower-2");
1868         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1869     }
1870
1871     @Test
1872     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1873         logStart("testIsolatedLeaderCheckTwoFollowers");
1874
1875         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1876
1877         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1878             behavior instanceof IsolatedLeader);
1879     }
1880
1881     @Test
1882     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1883         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1884
1885         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1886
1887         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1888                 behavior instanceof Leader);
1889     }
1890
1891     @Test
1892     public void testLaggingFollowerStarvation() throws Exception {
1893         logStart("testLaggingFollowerStarvation");
1894         new JavaTestKit(getSystem()) {{
1895             String leaderActorId = actorFactory.generateActorId("leader");
1896             String follower1ActorId = actorFactory.generateActorId("follower");
1897             String follower2ActorId = actorFactory.generateActorId("follower");
1898
1899             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1900                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1901             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1902             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1903
1904             MockRaftActorContext leaderActorContext =
1905                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1906
1907             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1908             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1909             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1910
1911             leaderActorContext.setConfigParams(configParams);
1912
1913             leaderActorContext.setReplicatedLog(
1914                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1915
1916             Map<String, String> peerAddresses = new HashMap<>();
1917             peerAddresses.put(follower1ActorId,
1918                     follower1Actor.path().toString());
1919             peerAddresses.put(follower2ActorId,
1920                     follower2Actor.path().toString());
1921
1922             leaderActorContext.setPeerAddresses(peerAddresses);
1923             leaderActorContext.getTermInformation().update(1, leaderActorId);
1924
1925             RaftActorBehavior leader = createBehavior(leaderActorContext);
1926
1927             leaderActor.underlyingActor().setBehavior(leader);
1928
1929             for(int i=1;i<6;i++) {
1930                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1931                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1932                 assertTrue(newBehavior == leader);
1933                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1934             }
1935
1936             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1937             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1938
1939             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1940                     heartbeats.size() > 1);
1941
1942             // Check if follower-2 got AppendEntries during this time and was not starved
1943             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1944
1945             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1946                     appendEntries.size() > 1);
1947
1948         }};
1949     }
1950
1951     @Test
1952     public void testReplicationConsensusWithNonVotingFollower() {
1953         logStart("testReplicationConsensusWithNonVotingFollower");
1954
1955         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1956         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1957                 new FiniteDuration(1000, TimeUnit.SECONDS));
1958
1959         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1960
1961         String nonVotingFollowerId = "nonvoting-follower";
1962         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1963                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1964
1965         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1966
1967         leader = new Leader(leaderActorContext);
1968
1969         // Ignore initial heartbeats
1970         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1971         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1972
1973         MessageCollectorActor.clearMessages(followerActor);
1974         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1975         MessageCollectorActor.clearMessages(leaderActor);
1976
1977         // Send a Replicate message and wait for AppendEntries.
1978         sendReplicate(leaderActorContext, 0);
1979
1980         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1981         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1982
1983         // Send reply only from the voting follower and verify consensus via ApplyState.
1984         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1985
1986         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1987
1988         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1989
1990         MessageCollectorActor.clearMessages(followerActor);
1991         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1992         MessageCollectorActor.clearMessages(leaderActor);
1993
1994         // Send another Replicate message
1995         sendReplicate(leaderActorContext, 1);
1996
1997         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1998         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1999                 AppendEntries.class);
2000         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2001         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2002
2003         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2004         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2005
2006         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2007
2008         // Send reply from the voting follower and verify consensus.
2009         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2010
2011         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2012     }
2013
2014     @Test
2015     public void testTransferLeadershipWithFollowerInSync() {
2016         logStart("testTransferLeadershipWithFollowerInSync");
2017
2018         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2019         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2020                 new FiniteDuration(1000, TimeUnit.SECONDS));
2021         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2022
2023         leader = new Leader(leaderActorContext);
2024
2025         // Initial heartbeat
2026         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2027         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2028         MessageCollectorActor.clearMessages(followerActor);
2029
2030         sendReplicate(leaderActorContext, 0);
2031         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2032
2033         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2034         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2035         MessageCollectorActor.clearMessages(followerActor);
2036
2037         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2038         leader.transferLeadership(mockTransferCohort);
2039
2040         verify(mockTransferCohort, never()).transferComplete();
2041         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2042         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2043
2044         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2045         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2046
2047         // Leader should force an election timeout
2048         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2049
2050         verify(mockTransferCohort).transferComplete();
2051     }
2052
2053     @Test
2054     public void testTransferLeadershipWithEmptyLog() {
2055         logStart("testTransferLeadershipWithEmptyLog");
2056
2057         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2058         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2059                 new FiniteDuration(1000, TimeUnit.SECONDS));
2060         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2061
2062         leader = new Leader(leaderActorContext);
2063
2064         // Initial heartbeat
2065         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2067         MessageCollectorActor.clearMessages(followerActor);
2068
2069         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2070         leader.transferLeadership(mockTransferCohort);
2071
2072         verify(mockTransferCohort, never()).transferComplete();
2073         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2075
2076         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2077         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2078
2079         // Leader should force an election timeout
2080         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2081
2082         verify(mockTransferCohort).transferComplete();
2083     }
2084
2085     @Test
2086     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2087         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2088
2089         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2090         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2091                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2092
2093         leader = new Leader(leaderActorContext);
2094
2095         // Initial heartbeat
2096         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2097         MessageCollectorActor.clearMessages(followerActor);
2098
2099         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2100         leader.transferLeadership(mockTransferCohort);
2101
2102         verify(mockTransferCohort, never()).transferComplete();
2103
2104         // Sync up the follower.
2105         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107         MessageCollectorActor.clearMessages(followerActor);
2108
2109         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2110                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2111         leader.handleMessage(leaderActor, new SendHeartBeat());
2112         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2113         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2114
2115         // Leader should force an election timeout
2116         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2117
2118         verify(mockTransferCohort).transferComplete();
2119     }
2120
2121     @Test
2122     public void testTransferLeadershipWithFollowerSyncTimeout() {
2123         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2124
2125         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2128         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2129         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2130
2131         leader = new Leader(leaderActorContext);
2132
2133         // Initial heartbeat
2134         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2135         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2136         MessageCollectorActor.clearMessages(followerActor);
2137
2138         sendReplicate(leaderActorContext, 0);
2139         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140
2141         MessageCollectorActor.clearMessages(followerActor);
2142
2143         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2144         leader.transferLeadership(mockTransferCohort);
2145
2146         verify(mockTransferCohort, never()).transferComplete();
2147
2148         // Send heartbeats to time out the transfer.
2149         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2150             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2151                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2152             leader.handleMessage(leaderActor, new SendHeartBeat());
2153         }
2154
2155         verify(mockTransferCohort).abortTransfer();
2156         verify(mockTransferCohort, never()).transferComplete();
2157         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2158     }
2159
2160     @Override
2161     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2162             ActorRef actorRef, RaftRPC rpc) throws Exception {
2163         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2164         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2165     }
2166
2167     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2168
2169         private final long electionTimeOutIntervalMillis;
2170         private final int snapshotChunkSize;
2171
2172         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2173             super();
2174             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2175             this.snapshotChunkSize = snapshotChunkSize;
2176         }
2177
2178         @Override
2179         public FiniteDuration getElectionTimeOutInterval() {
2180             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2181         }
2182
2183         @Override
2184         public int getSnapshotChunkSize() {
2185             return snapshotChunkSize;
2186         }
2187     }
2188 }