filter-valve: use lambdas
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.JavaTestKit;
27 import akka.testkit.TestActorRef;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
46 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
49 import org.opendaylight.controller.cluster.raft.RaftState;
50 import org.opendaylight.controller.cluster.raft.RaftVersions;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.VotingState;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
57 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
60 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
66 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
67 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
68 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
71 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
72 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import org.opendaylight.yangtools.concepts.Identifier;
76 import scala.concurrent.duration.FiniteDuration;
77
78 public class LeaderTest extends AbstractLeaderTest<Leader> {
79
80     static final String FOLLOWER_ID = "follower";
81     public static final String LEADER_ID = "leader";
82
83     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
84             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
85
86     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
87             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
88
89     private Leader leader;
90     private final short payloadVersion = 5;
91
92     @Override
93     @After
94     public void tearDown() throws Exception {
95         if (leader != null) {
96             leader.close();
97         }
98
99         super.tearDown();
100     }
101
102     @Test
103     public void testHandleMessageForUnknownMessage() throws Exception {
104         logStart("testHandleMessageForUnknownMessage");
105
106         leader = new Leader(createActorContext());
107
108         // handle message should null when it receives an unknown message
109         assertNull(leader.handleMessage(followerActor, "foo"));
110     }
111
112     @Test
113     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
114         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
115
116         MockRaftActorContext actorContext = createActorContextWithFollower();
117         actorContext.setCommitIndex(-1);
118         actorContext.setPayloadVersion(payloadVersion);
119
120         long term = 1;
121         actorContext.getTermInformation().update(term, "");
122
123         leader = new Leader(actorContext);
124         actorContext.setCurrentBehavior(leader);
125
126         // Leader should send an immediate heartbeat with no entries as follower is inactive.
127         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
128         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
129         assertEquals("getTerm", term, appendEntries.getTerm());
130         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
131         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
132         assertEquals("Entries size", 0, appendEntries.getEntries().size());
133         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
134
135         // The follower would normally reply - simulate that explicitly here.
136         leader.handleMessage(followerActor, new AppendEntriesReply(
137                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
138         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
139
140         followerActor.underlyingActor().clear();
141
142         // Sleep for the heartbeat interval so AppendEntries is sent.
143         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
144                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
145
146         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
147
148         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
149         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
150         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
151         assertEquals("Entries size", 1, appendEntries.getEntries().size());
152         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
153         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
154         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
155     }
156
157
158     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
159         return sendReplicate(actorContext, 1, index);
160     }
161
162     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
163         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
164         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
165         actorContext.getReplicatedLog().append(newEntry);
166         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
167     }
168
169     @Test
170     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
171         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
172
173         MockRaftActorContext actorContext = createActorContextWithFollower();
174
175         long term = 1;
176         actorContext.getTermInformation().update(term, "");
177
178         leader = new Leader(actorContext);
179
180         // Leader will send an immediate heartbeat - ignore it.
181         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
182
183         // The follower would normally reply - simulate that explicitly here.
184         long lastIndex = actorContext.getReplicatedLog().lastIndex();
185         leader.handleMessage(followerActor, new AppendEntriesReply(
186                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
187         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
188
189         followerActor.underlyingActor().clear();
190
191         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
192
193         // State should not change
194         assertTrue(raftBehavior instanceof Leader);
195
196         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
197         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
198         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
199         assertEquals("Entries size", 1, appendEntries.getEntries().size());
200         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
201         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
202         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
203         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
204     }
205
206     @Test
207     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
208         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
209
210         MockRaftActorContext actorContext = createActorContextWithFollower();
211         actorContext.setCommitIndex(-1);
212         actorContext.setLastApplied(-1);
213
214         // The raft context is initialized with a couple log entries. However the commitIndex
215         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
216         // committed and applied. Now it regains leadership with a higher term (2).
217         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
218         long newTerm = prevTerm + 1;
219         actorContext.getTermInformation().update(newTerm, "");
220
221         leader = new Leader(actorContext);
222         actorContext.setCurrentBehavior(leader);
223
224         // Leader will send an immediate heartbeat - ignore it.
225         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
226
227         // The follower replies with the leader's current last index and term, simulating that it is
228         // up to date with the leader.
229         long lastIndex = actorContext.getReplicatedLog().lastIndex();
230         leader.handleMessage(followerActor, new AppendEntriesReply(
231                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
232
233         // The commit index should not get updated even though consensus was reached. This is b/c the
234         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
235         // from previous terms by counting replicas".
236         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
237
238         followerActor.underlyingActor().clear();
239
240         // Now replicate a new entry with the new term 2.
241         long newIndex = lastIndex + 1;
242         sendReplicate(actorContext, newTerm, newIndex);
243
244         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
245         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
246         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
247         assertEquals("Entries size", 1, appendEntries.getEntries().size());
248         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
249         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
250         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
251
252         // The follower replies with success. The leader should now update the commit index to the new index
253         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
254         // prior entries are committed indirectly".
255         leader.handleMessage(followerActor, new AppendEntriesReply(
256                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
257
258         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
259     }
260
261     @Test
262     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
263         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
264
265         MockRaftActorContext actorContext = createActorContextWithFollower();
266         actorContext.setRaftPolicy(createRaftPolicy(true, true));
267
268         long term = 1;
269         actorContext.getTermInformation().update(term, "");
270
271         leader = new Leader(actorContext);
272
273         // Leader will send an immediate heartbeat - ignore it.
274         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
275
276         // The follower would normally reply - simulate that explicitly here.
277         long lastIndex = actorContext.getReplicatedLog().lastIndex();
278         leader.handleMessage(followerActor, new AppendEntriesReply(
279                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
280         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
281
282         followerActor.underlyingActor().clear();
283
284         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
285
286         // State should not change
287         assertTrue(raftBehavior instanceof Leader);
288
289         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
290         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
291         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
292         assertEquals("Entries size", 1, appendEntries.getEntries().size());
293         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
294         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
295         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
296         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
297     }
298
299     @Test
300     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
301         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
302
303         MockRaftActorContext actorContext = createActorContextWithFollower();
304         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
305             @Override
306             public FiniteDuration getHeartBeatInterval() {
307                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
308             }
309         });
310
311         long term = 1;
312         actorContext.getTermInformation().update(term, "");
313
314         leader = new Leader(actorContext);
315
316         // Leader will send an immediate heartbeat - ignore it.
317         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
318
319         // The follower would normally reply - simulate that explicitly here.
320         long lastIndex = actorContext.getReplicatedLog().lastIndex();
321         leader.handleMessage(followerActor, new AppendEntriesReply(
322                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
323         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
324
325         followerActor.underlyingActor().clear();
326
327         for (int i = 0; i < 5; i++) {
328             sendReplicate(actorContext, lastIndex + i + 1);
329         }
330
331         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
332         // We expect only 1 message to be sent because of two reasons,
333         // - an append entries reply was not received
334         // - the heartbeat interval has not expired
335         // In this scenario if multiple messages are sent they would likely be duplicates
336         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
337     }
338
339     @Test
340     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
341         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
342
343         MockRaftActorContext actorContext = createActorContextWithFollower();
344         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
345             @Override
346             public FiniteDuration getHeartBeatInterval() {
347                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
348             }
349         });
350
351         long term = 1;
352         actorContext.getTermInformation().update(term, "");
353
354         leader = new Leader(actorContext);
355
356         // Leader will send an immediate heartbeat - ignore it.
357         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
358
359         // The follower would normally reply - simulate that explicitly here.
360         long lastIndex = actorContext.getReplicatedLog().lastIndex();
361         leader.handleMessage(followerActor, new AppendEntriesReply(
362                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
363         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
364
365         followerActor.underlyingActor().clear();
366
367         for (int i = 0; i < 3; i++) {
368             sendReplicate(actorContext, lastIndex + i + 1);
369             leader.handleMessage(followerActor, new AppendEntriesReply(
370                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
371
372         }
373
374         for (int i = 3; i < 5; i++) {
375             sendReplicate(actorContext, lastIndex + i + 1);
376         }
377
378         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
379         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
380         // get sent to the follower - but not the 5th
381         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
382
383         for (int i = 0; i < 4; i++) {
384             long expected = allMessages.get(i).getEntries().get(0).getIndex();
385             assertEquals(expected, i + 2);
386         }
387     }
388
389     @Test
390     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
391         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
392
393         MockRaftActorContext actorContext = createActorContextWithFollower();
394         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
395             @Override
396             public FiniteDuration getHeartBeatInterval() {
397                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
398             }
399         });
400
401         long term = 1;
402         actorContext.getTermInformation().update(term, "");
403
404         leader = new Leader(actorContext);
405
406         // Leader will send an immediate heartbeat - ignore it.
407         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
408
409         // The follower would normally reply - simulate that explicitly here.
410         long lastIndex = actorContext.getReplicatedLog().lastIndex();
411         leader.handleMessage(followerActor, new AppendEntriesReply(
412                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
413         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
414
415         followerActor.underlyingActor().clear();
416
417         sendReplicate(actorContext, lastIndex + 1);
418
419         // Wait slightly longer than heartbeat duration
420         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
421
422         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
423
424         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
425         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
426
427         assertEquals(1, allMessages.get(0).getEntries().size());
428         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
429         assertEquals(1, allMessages.get(1).getEntries().size());
430         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
431
432     }
433
434     @Test
435     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
436         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
437
438         MockRaftActorContext actorContext = createActorContextWithFollower();
439         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
440             @Override
441             public FiniteDuration getHeartBeatInterval() {
442                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
443             }
444         });
445
446         long term = 1;
447         actorContext.getTermInformation().update(term, "");
448
449         leader = new Leader(actorContext);
450
451         // Leader will send an immediate heartbeat - ignore it.
452         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
453
454         // The follower would normally reply - simulate that explicitly here.
455         long lastIndex = actorContext.getReplicatedLog().lastIndex();
456         leader.handleMessage(followerActor, new AppendEntriesReply(
457                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
458         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
459
460         followerActor.underlyingActor().clear();
461
462         for (int i = 0; i < 3; i++) {
463             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
464             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
465         }
466
467         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
468         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
469     }
470
471     @Test
472     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
473         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
474
475         MockRaftActorContext actorContext = createActorContextWithFollower();
476         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
477             @Override
478             public FiniteDuration getHeartBeatInterval() {
479                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
480             }
481         });
482
483         long term = 1;
484         actorContext.getTermInformation().update(term, "");
485
486         leader = new Leader(actorContext);
487
488         // Leader will send an immediate heartbeat - ignore it.
489         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
490
491         // The follower would normally reply - simulate that explicitly here.
492         long lastIndex = actorContext.getReplicatedLog().lastIndex();
493         leader.handleMessage(followerActor, new AppendEntriesReply(
494                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
495         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
496
497         followerActor.underlyingActor().clear();
498
499         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
500         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
501         sendReplicate(actorContext, lastIndex + 1);
502
503         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
504         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
505
506         assertEquals(0, allMessages.get(0).getEntries().size());
507         assertEquals(1, allMessages.get(1).getEntries().size());
508     }
509
510
511     @Test
512     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
513         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
514
515         MockRaftActorContext actorContext = createActorContext();
516
517         leader = new Leader(actorContext);
518
519         actorContext.setLastApplied(0);
520
521         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
522         long term = actorContext.getTermInformation().getCurrentTerm();
523         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
524                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
525
526         actorContext.getReplicatedLog().append(newEntry);
527
528         final Identifier id = new MockIdentifier("state-id");
529         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
530                 new Replicate(leaderActor, id, newEntry, true));
531
532         // State should not change
533         assertTrue(raftBehavior instanceof Leader);
534
535         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
536
537         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
538         // one since lastApplied state is 0.
539         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
540                 leaderActor, ApplyState.class);
541         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
542
543         for (int i = 0; i <= newLogIndex - 1; i++) {
544             ApplyState applyState = applyStateList.get(i);
545             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
546             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
547         }
548
549         ApplyState last = applyStateList.get((int) newLogIndex - 1);
550         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
551         assertEquals("getIdentifier", id, last.getIdentifier());
552     }
553
554     @Test
555     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
556         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
557
558         final MockRaftActorContext actorContext = createActorContextWithFollower();
559
560         Map<String, String> leadersSnapshot = new HashMap<>();
561         leadersSnapshot.put("1", "A");
562         leadersSnapshot.put("2", "B");
563         leadersSnapshot.put("3", "C");
564
565         //clears leaders log
566         actorContext.getReplicatedLog().removeFrom(0);
567
568         final int commitIndex = 3;
569         final int snapshotIndex = 2;
570         final int snapshotTerm = 1;
571
572         // set the snapshot variables in replicatedlog
573         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
574         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
575         actorContext.setCommitIndex(commitIndex);
576         //set follower timeout to 2 mins, helps during debugging
577         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
578
579         leader = new Leader(actorContext);
580
581         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
582         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
583
584         //update follower timestamp
585         leader.markFollowerActive(FOLLOWER_ID);
586
587         ByteString bs = toByteString(leadersSnapshot);
588         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
589                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
590                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
591         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
592                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
593         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
594         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
595
596         //send first chunk and no InstallSnapshotReply received yet
597         fts.getNextChunk();
598         fts.incrementChunkIndex();
599
600         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
601                 TimeUnit.MILLISECONDS);
602
603         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
604
605         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
606
607         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
608
609         //InstallSnapshotReply received
610         fts.markSendStatus(true);
611
612         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
613
614         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
615
616         assertEquals(commitIndex, is.getLastIncludedIndex());
617     }
618
619     @Test
620     public void testSendAppendEntriesSnapshotScenario() throws Exception {
621         logStart("testSendAppendEntriesSnapshotScenario");
622
623         final MockRaftActorContext actorContext = createActorContextWithFollower();
624
625         Map<String, String> leadersSnapshot = new HashMap<>();
626         leadersSnapshot.put("1", "A");
627         leadersSnapshot.put("2", "B");
628         leadersSnapshot.put("3", "C");
629
630         //clears leaders log
631         actorContext.getReplicatedLog().removeFrom(0);
632
633         final int followersLastIndex = 2;
634         final int snapshotIndex = 3;
635         final int newEntryIndex = 4;
636         final int snapshotTerm = 1;
637         final int currentTerm = 2;
638
639         // set the snapshot variables in replicatedlog
640         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
641         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
642         actorContext.setCommitIndex(followersLastIndex);
643
644         leader = new Leader(actorContext);
645
646         // Leader will send an immediate heartbeat - ignore it.
647         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
648
649         // new entry
650         SimpleReplicatedLogEntry entry =
651                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
652                         new MockRaftActorContext.MockPayload("D"));
653
654         actorContext.getReplicatedLog().append(entry);
655
656         //update follower timestamp
657         leader.markFollowerActive(FOLLOWER_ID);
658
659         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
660         RaftActorBehavior raftBehavior = leader.handleMessage(
661                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
662
663         assertTrue(raftBehavior instanceof Leader);
664
665         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
666     }
667
668     @Test
669     public void testInitiateInstallSnapshot() throws Exception {
670         logStart("testInitiateInstallSnapshot");
671
672         MockRaftActorContext actorContext = createActorContextWithFollower();
673
674         //clears leaders log
675         actorContext.getReplicatedLog().removeFrom(0);
676
677         final int followersLastIndex = 2;
678         final int snapshotIndex = 3;
679         final int newEntryIndex = 4;
680         final int snapshotTerm = 1;
681         final int currentTerm = 2;
682
683         // set the snapshot variables in replicatedlog
684         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
685         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
686         actorContext.setLastApplied(3);
687         actorContext.setCommitIndex(followersLastIndex);
688
689         leader = new Leader(actorContext);
690
691         // Leader will send an immediate heartbeat - ignore it.
692         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
693
694         // set the snapshot as absent and check if capture-snapshot is invoked.
695         leader.setSnapshot(null);
696
697         // new entry
698         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
699                 new MockRaftActorContext.MockPayload("D"));
700
701         actorContext.getReplicatedLog().append(entry);
702
703         //update follower timestamp
704         leader.markFollowerActive(FOLLOWER_ID);
705
706         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
707
708         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
709
710         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
711
712         assertEquals(3, cs.getLastAppliedIndex());
713         assertEquals(1, cs.getLastAppliedTerm());
714         assertEquals(4, cs.getLastIndex());
715         assertEquals(2, cs.getLastTerm());
716
717         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
718         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
719
720         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
721     }
722
723     @Test
724     public void testInitiateForceInstallSnapshot() throws Exception {
725         logStart("testInitiateForceInstallSnapshot");
726
727         MockRaftActorContext actorContext = createActorContextWithFollower();
728
729         final int followersLastIndex = 2;
730         final int snapshotIndex = -1;
731         final int newEntryIndex = 4;
732         final int snapshotTerm = -1;
733         final int currentTerm = 2;
734
735         // set the snapshot variables in replicatedlog
736         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738         actorContext.setLastApplied(3);
739         actorContext.setCommitIndex(followersLastIndex);
740
741         actorContext.getReplicatedLog().removeFrom(0);
742
743         AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
744         actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out));
745
746         leader = new Leader(actorContext);
747         actorContext.setCurrentBehavior(leader);
748
749         // Leader will send an immediate heartbeat - ignore it.
750         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
751
752         // set the snapshot as absent and check if capture-snapshot is invoked.
753         leader.setSnapshot(null);
754
755         for (int i = 0; i < 4; i++) {
756             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
757                     new MockRaftActorContext.MockPayload("X" + i)));
758         }
759
760         // new entry
761         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
762                 new MockRaftActorContext.MockPayload("D"));
763
764         actorContext.getReplicatedLog().append(entry);
765
766         //update follower timestamp
767         leader.markFollowerActive(FOLLOWER_ID);
768
769         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
770         // installed with a SendInstallSnapshot
771         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
772
773         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
774
775         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
776         assertEquals(3, cs.getLastAppliedIndex());
777         assertEquals(1, cs.getLastAppliedTerm());
778         assertEquals(4, cs.getLastIndex());
779         assertEquals(2, cs.getLastTerm());
780
781         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
782         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
783
784         MessageCollectorActor.clearMessages(followerActor);
785
786         // Sending Replicate message should not initiate another capture since the first is in progress.
787         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
788         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
789
790         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
791         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
792         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
793
794         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
795         final byte[] bytes = new byte[]{1, 2, 3};
796         installSnapshotStream.get().get().write(bytes);
797         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
798                 Runtime.getRuntime().totalMemory());
799         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
800
801         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
802         MessageCollectorActor.clearMessages(followerActor);
803         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
804         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
805     }
806
807
808     @Test
809     public void testInstallSnapshot() throws Exception {
810         logStart("testInstallSnapshot");
811
812         final MockRaftActorContext actorContext = createActorContextWithFollower();
813
814         Map<String, String> leadersSnapshot = new HashMap<>();
815         leadersSnapshot.put("1", "A");
816         leadersSnapshot.put("2", "B");
817         leadersSnapshot.put("3", "C");
818
819         //clears leaders log
820         actorContext.getReplicatedLog().removeFrom(0);
821
822         final int lastAppliedIndex = 3;
823         final int snapshotIndex = 2;
824         final int snapshotTerm = 1;
825         final int currentTerm = 2;
826
827         // set the snapshot variables in replicatedlog
828         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
829         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
830         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
831         actorContext.setCommitIndex(lastAppliedIndex);
832         actorContext.setLastApplied(lastAppliedIndex);
833
834         leader = new Leader(actorContext);
835
836         // Initial heartbeat.
837         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
838
839         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
840         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
841
842         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
843         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
844                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
845
846         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
847                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
848
849         assertTrue(raftBehavior instanceof Leader);
850
851         // check if installsnapshot gets called with the correct values.
852
853         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
854                 InstallSnapshot.class);
855
856         assertNotNull(installSnapshot.getData());
857         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
858         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
859
860         assertEquals(currentTerm, installSnapshot.getTerm());
861     }
862
863     @Test
864     public void testForceInstallSnapshot() throws Exception {
865         logStart("testForceInstallSnapshot");
866
867         final MockRaftActorContext actorContext = createActorContextWithFollower();
868
869         Map<String, String> leadersSnapshot = new HashMap<>();
870         leadersSnapshot.put("1", "A");
871         leadersSnapshot.put("2", "B");
872         leadersSnapshot.put("3", "C");
873
874         final int lastAppliedIndex = 3;
875         final int snapshotIndex = -1;
876         final int snapshotTerm = -1;
877         final int currentTerm = 2;
878
879         // set the snapshot variables in replicatedlog
880         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
881         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
882         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
883         actorContext.setCommitIndex(lastAppliedIndex);
884         actorContext.setLastApplied(lastAppliedIndex);
885
886         leader = new Leader(actorContext);
887
888         // Initial heartbeat.
889         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
890
891         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
892         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
893
894         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
895         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
896                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
897
898         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
899                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
900
901         assertTrue(raftBehavior instanceof Leader);
902
903         // check if installsnapshot gets called with the correct values.
904
905         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
906                 InstallSnapshot.class);
907
908         assertNotNull(installSnapshot.getData());
909         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
910         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
911
912         assertEquals(currentTerm, installSnapshot.getTerm());
913     }
914
915     @Test
916     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
917         logStart("testHandleInstallSnapshotReplyLastChunk");
918
919         MockRaftActorContext actorContext = createActorContextWithFollower();
920
921         final int commitIndex = 3;
922         final int snapshotIndex = 2;
923         final int snapshotTerm = 1;
924         final int currentTerm = 2;
925
926         actorContext.setCommitIndex(commitIndex);
927
928         leader = new Leader(actorContext);
929         actorContext.setCurrentBehavior(leader);
930
931         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
932         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
933
934         // Ignore initial heartbeat.
935         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
936
937         Map<String, String> leadersSnapshot = new HashMap<>();
938         leadersSnapshot.put("1", "A");
939         leadersSnapshot.put("2", "B");
940         leadersSnapshot.put("3", "C");
941
942         // set the snapshot variables in replicatedlog
943
944         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
945         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
946         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
947
948         ByteString bs = toByteString(leadersSnapshot);
949         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
950                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
951                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
952         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
953                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
954         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
955         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
956         while (!fts.isLastChunk(fts.getChunkIndex())) {
957             fts.getNextChunk();
958             fts.incrementChunkIndex();
959         }
960
961         //clears leaders log
962         actorContext.getReplicatedLog().removeFrom(0);
963
964         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
965                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
966
967         assertTrue(raftBehavior instanceof Leader);
968
969         assertEquals(1, leader.followerLogSize());
970         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
971         assertNotNull(fli);
972         assertNull(fli.getInstallSnapshotState());
973         assertEquals(commitIndex, fli.getMatchIndex());
974         assertEquals(commitIndex + 1, fli.getNextIndex());
975         assertFalse(leader.hasSnapshot());
976     }
977
978     @Test
979     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
980         logStart("testSendSnapshotfromInstallSnapshotReply");
981
982         MockRaftActorContext actorContext = createActorContextWithFollower();
983
984         final int commitIndex = 3;
985         final int snapshotIndex = 2;
986         final int snapshotTerm = 1;
987         final int currentTerm = 2;
988
989         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
990             @Override
991             public int getSnapshotChunkSize() {
992                 return 50;
993             }
994         };
995         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
996         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
997
998         actorContext.setConfigParams(configParams);
999         actorContext.setCommitIndex(commitIndex);
1000
1001         leader = new Leader(actorContext);
1002         actorContext.setCurrentBehavior(leader);
1003
1004         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1005         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1006
1007         Map<String, String> leadersSnapshot = new HashMap<>();
1008         leadersSnapshot.put("1", "A");
1009         leadersSnapshot.put("2", "B");
1010         leadersSnapshot.put("3", "C");
1011
1012         // set the snapshot variables in replicatedlog
1013         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1014         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1015         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1016
1017         ByteString bs = toByteString(leadersSnapshot);
1018         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1019                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1020                 -1, null, null);
1021
1022         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1023
1024         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1025                 InstallSnapshot.class);
1026
1027         assertEquals(1, installSnapshot.getChunkIndex());
1028         assertEquals(3, installSnapshot.getTotalChunks());
1029
1030         followerActor.underlyingActor().clear();
1031         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1032                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1033
1034         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1035
1036         assertEquals(2, installSnapshot.getChunkIndex());
1037         assertEquals(3, installSnapshot.getTotalChunks());
1038
1039         followerActor.underlyingActor().clear();
1040         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1041                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1042
1043         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1044
1045         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1046         followerActor.underlyingActor().clear();
1047         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1048                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1049
1050         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1051
1052         assertNull(installSnapshot);
1053     }
1054
1055
1056     @Test
1057     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1058         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1059
1060         MockRaftActorContext actorContext = createActorContextWithFollower();
1061
1062         final int commitIndex = 3;
1063         final int snapshotIndex = 2;
1064         final int snapshotTerm = 1;
1065         final int currentTerm = 2;
1066
1067         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1068             @Override
1069             public int getSnapshotChunkSize() {
1070                 return 50;
1071             }
1072         });
1073
1074         actorContext.setCommitIndex(commitIndex);
1075
1076         leader = new Leader(actorContext);
1077
1078         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1079         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1080
1081         Map<String, String> leadersSnapshot = new HashMap<>();
1082         leadersSnapshot.put("1", "A");
1083         leadersSnapshot.put("2", "B");
1084         leadersSnapshot.put("3", "C");
1085
1086         // set the snapshot variables in replicatedlog
1087         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1088         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1089         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1090
1091         ByteString bs = toByteString(leadersSnapshot);
1092         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1093                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1094                 -1, null, null);
1095
1096         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1097         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1098
1099         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1100                 InstallSnapshot.class);
1101
1102         assertEquals(1, installSnapshot.getChunkIndex());
1103         assertEquals(3, installSnapshot.getTotalChunks());
1104
1105         followerActor.underlyingActor().clear();
1106
1107         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1108                 FOLLOWER_ID, -1, false));
1109
1110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1111                 TimeUnit.MILLISECONDS);
1112
1113         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1114
1115         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1116
1117         assertEquals(1, installSnapshot.getChunkIndex());
1118         assertEquals(3, installSnapshot.getTotalChunks());
1119     }
1120
1121     @Test
1122     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1123         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1124
1125         MockRaftActorContext actorContext = createActorContextWithFollower();
1126
1127         final int commitIndex = 3;
1128         final int snapshotIndex = 2;
1129         final int snapshotTerm = 1;
1130         final int currentTerm = 2;
1131
1132         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1133             @Override
1134             public int getSnapshotChunkSize() {
1135                 return 50;
1136             }
1137         });
1138
1139         actorContext.setCommitIndex(commitIndex);
1140
1141         leader = new Leader(actorContext);
1142
1143         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1144         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1145
1146         Map<String, String> leadersSnapshot = new HashMap<>();
1147         leadersSnapshot.put("1", "A");
1148         leadersSnapshot.put("2", "B");
1149         leadersSnapshot.put("3", "C");
1150
1151         // set the snapshot variables in replicatedlog
1152         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1153         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1154         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1155
1156         ByteString bs = toByteString(leadersSnapshot);
1157         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1158                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1159                 -1, null, null);
1160
1161         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1162
1163         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1164                 InstallSnapshot.class);
1165
1166         assertEquals(1, installSnapshot.getChunkIndex());
1167         assertEquals(3, installSnapshot.getTotalChunks());
1168         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1169                 installSnapshot.getLastChunkHashCode().get().intValue());
1170
1171         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1172
1173         followerActor.underlyingActor().clear();
1174
1175         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1176                 FOLLOWER_ID, 1, true));
1177
1178         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1179
1180         assertEquals(2, installSnapshot.getChunkIndex());
1181         assertEquals(3, installSnapshot.getTotalChunks());
1182         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1183     }
1184
1185     @Test
1186     public void testLeaderInstallSnapshotState() throws IOException {
1187         logStart("testLeaderInstallSnapshotState");
1188
1189         Map<String, String> leadersSnapshot = new HashMap<>();
1190         leadersSnapshot.put("1", "A");
1191         leadersSnapshot.put("2", "B");
1192         leadersSnapshot.put("3", "C");
1193
1194         ByteString bs = toByteString(leadersSnapshot);
1195         byte[] barray = bs.toByteArray();
1196
1197         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1198         fts.setSnapshotBytes(ByteSource.wrap(barray));
1199
1200         assertEquals(bs.size(), barray.length);
1201
1202         int chunkIndex = 0;
1203         for (int i = 0; i < barray.length; i = i + 50) {
1204             int length = i + 50;
1205             chunkIndex++;
1206
1207             if (i + 50 > barray.length) {
1208                 length = barray.length;
1209             }
1210
1211             byte[] chunk = fts.getNextChunk();
1212             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1213             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1214
1215             fts.markSendStatus(true);
1216             if (!fts.isLastChunk(chunkIndex)) {
1217                 fts.incrementChunkIndex();
1218             }
1219         }
1220
1221         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1222         fts.close();
1223     }
1224
1225     @Override
1226     protected Leader createBehavior(final RaftActorContext actorContext) {
1227         return new Leader(actorContext);
1228     }
1229
1230     @Override
1231     protected MockRaftActorContext createActorContext() {
1232         return createActorContext(leaderActor);
1233     }
1234
1235     @Override
1236     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1237         return createActorContext(LEADER_ID, actorRef);
1238     }
1239
1240     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1241         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1242         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1243         configParams.setElectionTimeoutFactor(100000);
1244         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1245         context.setConfigParams(configParams);
1246         context.setPayloadVersion(payloadVersion);
1247         return context;
1248     }
1249
1250     private MockRaftActorContext createActorContextWithFollower() {
1251         MockRaftActorContext actorContext = createActorContext();
1252         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1253                 followerActor.path().toString()).build());
1254         return actorContext;
1255     }
1256
1257     private MockRaftActorContext createFollowerActorContextWithLeader() {
1258         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1259         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1260         followerConfig.setElectionTimeoutFactor(10000);
1261         followerActorContext.setConfigParams(followerConfig);
1262         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1263         return followerActorContext;
1264     }
1265
1266     @Test
1267     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1268         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1269
1270         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1271
1272         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1273
1274         Follower follower = new Follower(followerActorContext);
1275         followerActor.underlyingActor().setBehavior(follower);
1276         followerActorContext.setCurrentBehavior(follower);
1277
1278         Map<String, String> peerAddresses = new HashMap<>();
1279         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1280
1281         leaderActorContext.setPeerAddresses(peerAddresses);
1282
1283         leaderActorContext.getReplicatedLog().removeFrom(0);
1284
1285         //create 3 entries
1286         leaderActorContext.setReplicatedLog(
1287                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1288
1289         leaderActorContext.setCommitIndex(1);
1290
1291         followerActorContext.getReplicatedLog().removeFrom(0);
1292
1293         // follower too has the exact same log entries and has the same commit index
1294         followerActorContext.setReplicatedLog(
1295                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1296
1297         followerActorContext.setCommitIndex(1);
1298
1299         leader = new Leader(leaderActorContext);
1300         leaderActorContext.setCurrentBehavior(leader);
1301
1302         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1303
1304         assertEquals(-1, appendEntries.getLeaderCommit());
1305         assertEquals(0, appendEntries.getEntries().size());
1306         assertEquals(0, appendEntries.getPrevLogIndex());
1307
1308         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1309                 leaderActor, AppendEntriesReply.class);
1310
1311         assertEquals(2, appendEntriesReply.getLogLastIndex());
1312         assertEquals(1, appendEntriesReply.getLogLastTerm());
1313
1314         // follower returns its next index
1315         assertEquals(2, appendEntriesReply.getLogLastIndex());
1316         assertEquals(1, appendEntriesReply.getLogLastTerm());
1317
1318         follower.close();
1319     }
1320
1321     @Test
1322     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1323         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1324
1325         final MockRaftActorContext leaderActorContext = createActorContext();
1326
1327         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1328         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1329
1330         Follower follower = new Follower(followerActorContext);
1331         followerActor.underlyingActor().setBehavior(follower);
1332         followerActorContext.setCurrentBehavior(follower);
1333
1334         Map<String, String> leaderPeerAddresses = new HashMap<>();
1335         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1336
1337         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1338
1339         leaderActorContext.getReplicatedLog().removeFrom(0);
1340
1341         leaderActorContext.setReplicatedLog(
1342                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1343
1344         leaderActorContext.setCommitIndex(1);
1345
1346         followerActorContext.getReplicatedLog().removeFrom(0);
1347
1348         followerActorContext.setReplicatedLog(
1349                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1350
1351         // follower has the same log entries but its commit index > leaders commit index
1352         followerActorContext.setCommitIndex(2);
1353
1354         leader = new Leader(leaderActorContext);
1355
1356         // Initial heartbeat
1357         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1358
1359         assertEquals(-1, appendEntries.getLeaderCommit());
1360         assertEquals(0, appendEntries.getEntries().size());
1361         assertEquals(0, appendEntries.getPrevLogIndex());
1362
1363         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1364                 leaderActor, AppendEntriesReply.class);
1365
1366         assertEquals(2, appendEntriesReply.getLogLastIndex());
1367         assertEquals(1, appendEntriesReply.getLogLastTerm());
1368
1369         leaderActor.underlyingActor().setBehavior(follower);
1370         leader.handleMessage(followerActor, appendEntriesReply);
1371
1372         leaderActor.underlyingActor().clear();
1373         followerActor.underlyingActor().clear();
1374
1375         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1376                 TimeUnit.MILLISECONDS);
1377
1378         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1379
1380         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1381
1382         assertEquals(2, appendEntries.getLeaderCommit());
1383         assertEquals(0, appendEntries.getEntries().size());
1384         assertEquals(2, appendEntries.getPrevLogIndex());
1385
1386         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1387
1388         assertEquals(2, appendEntriesReply.getLogLastIndex());
1389         assertEquals(1, appendEntriesReply.getLogLastTerm());
1390
1391         assertEquals(2, followerActorContext.getCommitIndex());
1392
1393         follower.close();
1394     }
1395
1396     @Test
1397     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1398         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1399
1400         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1401         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1402                 new FiniteDuration(1000, TimeUnit.SECONDS));
1403
1404         leaderActorContext.setReplicatedLog(
1405                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1406         long leaderCommitIndex = 2;
1407         leaderActorContext.setCommitIndex(leaderCommitIndex);
1408         leaderActorContext.setLastApplied(leaderCommitIndex);
1409
1410         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1411         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1412
1413         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1414
1415         followerActorContext.setReplicatedLog(
1416                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1417         followerActorContext.setCommitIndex(0);
1418         followerActorContext.setLastApplied(0);
1419
1420         Follower follower = new Follower(followerActorContext);
1421         followerActor.underlyingActor().setBehavior(follower);
1422
1423         leader = new Leader(leaderActorContext);
1424
1425         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1426         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1427                 AppendEntriesReply.class);
1428
1429         MessageCollectorActor.clearMessages(followerActor);
1430         MessageCollectorActor.clearMessages(leaderActor);
1431
1432         // Verify initial AppendEntries sent.
1433         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1434         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1435         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1436
1437         leaderActor.underlyingActor().setBehavior(leader);
1438
1439         leader.handleMessage(followerActor, appendEntriesReply);
1440
1441         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1442         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1443
1444         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1445         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1446         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1447
1448         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1449         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1450                 appendEntries.getEntries().get(0).getData());
1451         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1452         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1453                 appendEntries.getEntries().get(1).getData());
1454
1455         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1456         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1457
1458         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1459
1460         ApplyState applyState = applyStateList.get(0);
1461         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1462         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1463         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1464                 applyState.getReplicatedLogEntry().getData());
1465
1466         applyState = applyStateList.get(1);
1467         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1468         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1469         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1470                 applyState.getReplicatedLogEntry().getData());
1471
1472         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1473         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1474     }
1475
1476     @Test
1477     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1478         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1479
1480         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1481         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1482                 new FiniteDuration(1000, TimeUnit.SECONDS));
1483
1484         leaderActorContext.setReplicatedLog(
1485                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1486         long leaderCommitIndex = 1;
1487         leaderActorContext.setCommitIndex(leaderCommitIndex);
1488         leaderActorContext.setLastApplied(leaderCommitIndex);
1489
1490         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1491         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1492
1493         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1494
1495         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1496         followerActorContext.setCommitIndex(-1);
1497         followerActorContext.setLastApplied(-1);
1498
1499         Follower follower = new Follower(followerActorContext);
1500         followerActor.underlyingActor().setBehavior(follower);
1501         followerActorContext.setCurrentBehavior(follower);
1502
1503         leader = new Leader(leaderActorContext);
1504
1505         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1506         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1507                 AppendEntriesReply.class);
1508
1509         MessageCollectorActor.clearMessages(followerActor);
1510         MessageCollectorActor.clearMessages(leaderActor);
1511
1512         // Verify initial AppendEntries sent with the leader's current commit index.
1513         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1514         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1515         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1516
1517         leaderActor.underlyingActor().setBehavior(leader);
1518         leaderActorContext.setCurrentBehavior(leader);
1519
1520         leader.handleMessage(followerActor, appendEntriesReply);
1521
1522         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1523         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1524
1525         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1526         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1527         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1528
1529         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1530         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1531                 appendEntries.getEntries().get(0).getData());
1532         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1533         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1534                 appendEntries.getEntries().get(1).getData());
1535
1536         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1537         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1538
1539         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1540
1541         ApplyState applyState = applyStateList.get(0);
1542         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1543         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1544         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1545                 applyState.getReplicatedLogEntry().getData());
1546
1547         applyState = applyStateList.get(1);
1548         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1549         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1550         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1551                 applyState.getReplicatedLogEntry().getData());
1552
1553         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1554         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1555     }
1556
1557     @Test
1558     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1559         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1560
1561         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1562         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1563                 new FiniteDuration(1000, TimeUnit.SECONDS));
1564
1565         leaderActorContext.setReplicatedLog(
1566                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1567         long leaderCommitIndex = 1;
1568         leaderActorContext.setCommitIndex(leaderCommitIndex);
1569         leaderActorContext.setLastApplied(leaderCommitIndex);
1570
1571         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1572         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1573
1574         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1575
1576         followerActorContext.setReplicatedLog(
1577                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1578         followerActorContext.setCommitIndex(-1);
1579         followerActorContext.setLastApplied(-1);
1580
1581         Follower follower = new Follower(followerActorContext);
1582         followerActor.underlyingActor().setBehavior(follower);
1583         followerActorContext.setCurrentBehavior(follower);
1584
1585         leader = new Leader(leaderActorContext);
1586
1587         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1588         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1589                 AppendEntriesReply.class);
1590
1591         MessageCollectorActor.clearMessages(followerActor);
1592         MessageCollectorActor.clearMessages(leaderActor);
1593
1594         // Verify initial AppendEntries sent with the leader's current commit index.
1595         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1596         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1597         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1598
1599         leaderActor.underlyingActor().setBehavior(leader);
1600         leaderActorContext.setCurrentBehavior(leader);
1601
1602         leader.handleMessage(followerActor, appendEntriesReply);
1603
1604         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1605         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1606
1607         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1608         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1609         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1610
1611         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1612         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1613         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1614                 appendEntries.getEntries().get(0).getData());
1615         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1616         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1617         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1618                 appendEntries.getEntries().get(1).getData());
1619
1620         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1621         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1622
1623         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1624
1625         ApplyState applyState = applyStateList.get(0);
1626         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1627         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1628         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1629                 applyState.getReplicatedLogEntry().getData());
1630
1631         applyState = applyStateList.get(1);
1632         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1633         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1634         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1635                 applyState.getReplicatedLogEntry().getData());
1636
1637         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1638         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1639         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1640     }
1641
1642     @Test
1643     public void testHandleAppendEntriesReplyWithNewerTerm() {
1644         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1645
1646         MockRaftActorContext leaderActorContext = createActorContext();
1647         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1648                 new FiniteDuration(10000, TimeUnit.SECONDS));
1649
1650         leaderActorContext.setReplicatedLog(
1651                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1652
1653         leader = new Leader(leaderActorContext);
1654         leaderActor.underlyingActor().setBehavior(leader);
1655         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1656
1657         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1658                 AppendEntriesReply.class);
1659
1660         assertEquals(false, appendEntriesReply.isSuccess());
1661         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1662
1663         MessageCollectorActor.clearMessages(leaderActor);
1664     }
1665
1666     @Test
1667     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1668         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1669
1670         MockRaftActorContext leaderActorContext = createActorContext();
1671         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1672                 new FiniteDuration(10000, TimeUnit.SECONDS));
1673
1674         leaderActorContext.setReplicatedLog(
1675                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1676         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1677
1678         leader = new Leader(leaderActorContext);
1679         leaderActor.underlyingActor().setBehavior(leader);
1680         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1681
1682         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1683                 AppendEntriesReply.class);
1684
1685         assertEquals(false, appendEntriesReply.isSuccess());
1686         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1687
1688         MessageCollectorActor.clearMessages(leaderActor);
1689     }
1690
1691     @Test
1692     public void testHandleAppendEntriesReplySuccess() throws Exception {
1693         logStart("testHandleAppendEntriesReplySuccess");
1694
1695         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1696
1697         leaderActorContext.setReplicatedLog(
1698                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1699
1700         leaderActorContext.setCommitIndex(1);
1701         leaderActorContext.setLastApplied(1);
1702         leaderActorContext.getTermInformation().update(1, "leader");
1703
1704         leader = new Leader(leaderActorContext);
1705
1706         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1707
1708         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1709         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1710
1711         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1712
1713         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1714
1715         assertEquals(RaftState.Leader, raftActorBehavior.state());
1716
1717         assertEquals(2, leaderActorContext.getCommitIndex());
1718
1719         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1720                 leaderActor, ApplyJournalEntries.class);
1721
1722         assertEquals(2, leaderActorContext.getLastApplied());
1723
1724         assertEquals(2, applyJournalEntries.getToIndex());
1725
1726         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1727                 ApplyState.class);
1728
1729         assertEquals(1,applyStateList.size());
1730
1731         ApplyState applyState = applyStateList.get(0);
1732
1733         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1734
1735         assertEquals(2, followerInfo.getMatchIndex());
1736         assertEquals(3, followerInfo.getNextIndex());
1737         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1738         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1739     }
1740
1741     @Test
1742     public void testHandleAppendEntriesReplyUnknownFollower() {
1743         logStart("testHandleAppendEntriesReplyUnknownFollower");
1744
1745         MockRaftActorContext leaderActorContext = createActorContext();
1746
1747         leader = new Leader(leaderActorContext);
1748
1749         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1750
1751         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1752
1753         assertEquals(RaftState.Leader, raftActorBehavior.state());
1754     }
1755
1756     @Test
1757     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1758         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1759
1760         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1761         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1762                 new FiniteDuration(1000, TimeUnit.SECONDS));
1763         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1764
1765         leaderActorContext.setReplicatedLog(
1766                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1767         long leaderCommitIndex = 3;
1768         leaderActorContext.setCommitIndex(leaderCommitIndex);
1769         leaderActorContext.setLastApplied(leaderCommitIndex);
1770
1771         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1772         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1773         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1774         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1775
1776         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1777
1778         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1779         followerActorContext.setCommitIndex(-1);
1780         followerActorContext.setLastApplied(-1);
1781
1782         Follower follower = new Follower(followerActorContext);
1783         followerActor.underlyingActor().setBehavior(follower);
1784         followerActorContext.setCurrentBehavior(follower);
1785
1786         leader = new Leader(leaderActorContext);
1787
1788         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1789         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1790                 AppendEntriesReply.class);
1791
1792         MessageCollectorActor.clearMessages(followerActor);
1793         MessageCollectorActor.clearMessages(leaderActor);
1794
1795         // Verify initial AppendEntries sent with the leader's current commit index.
1796         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1797         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1798         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1799
1800         leaderActor.underlyingActor().setBehavior(leader);
1801         leaderActorContext.setCurrentBehavior(leader);
1802
1803         leader.handleMessage(followerActor, appendEntriesReply);
1804
1805         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1806                 AppendEntries.class, 2);
1807         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1808
1809         appendEntries = appendEntriesList.get(0);
1810         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1811         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1812         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1813
1814         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1815         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1816                 appendEntries.getEntries().get(0).getData());
1817         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1818         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1819                 appendEntries.getEntries().get(1).getData());
1820
1821         appendEntries = appendEntriesList.get(1);
1822         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1823         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1824         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1825
1826         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1827         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1828                 appendEntries.getEntries().get(0).getData());
1829         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1830         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1831                 appendEntries.getEntries().get(1).getData());
1832
1833         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1834         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1835
1836         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1837
1838         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1839         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1840     }
1841
1842     @Test
1843     public void testHandleRequestVoteReply() {
1844         logStart("testHandleRequestVoteReply");
1845
1846         MockRaftActorContext leaderActorContext = createActorContext();
1847
1848         leader = new Leader(leaderActorContext);
1849
1850         // Should be a no-op.
1851         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1852                 new RequestVoteReply(1, true));
1853
1854         assertEquals(RaftState.Leader, raftActorBehavior.state());
1855
1856         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1857
1858         assertEquals(RaftState.Leader, raftActorBehavior.state());
1859     }
1860
1861     @Test
1862     public void testIsolatedLeaderCheckNoFollowers() {
1863         logStart("testIsolatedLeaderCheckNoFollowers");
1864
1865         MockRaftActorContext leaderActorContext = createActorContext();
1866
1867         leader = new Leader(leaderActorContext);
1868         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1869         assertTrue(newBehavior instanceof Leader);
1870     }
1871
1872     @Test
1873     public void testIsolatedLeaderCheckNoVotingFollowers() {
1874         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1875
1876         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1877         Follower follower = new Follower(followerActorContext);
1878         followerActor.underlyingActor().setBehavior(follower);
1879
1880         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1881         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1882                 new FiniteDuration(1000, TimeUnit.SECONDS));
1883         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1884
1885         leader = new Leader(leaderActorContext);
1886         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1887         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1888         assertTrue("Expected Leader", newBehavior instanceof Leader);
1889     }
1890
1891     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1892         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1893         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1894
1895         MockRaftActorContext leaderActorContext = createActorContext();
1896
1897         Map<String, String> peerAddresses = new HashMap<>();
1898         peerAddresses.put("follower-1", followerActor1.path().toString());
1899         peerAddresses.put("follower-2", followerActor2.path().toString());
1900
1901         leaderActorContext.setPeerAddresses(peerAddresses);
1902         leaderActorContext.setRaftPolicy(raftPolicy);
1903
1904         leader = new Leader(leaderActorContext);
1905
1906         leader.markFollowerActive("follower-1");
1907         leader.markFollowerActive("follower-2");
1908         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1909         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1910
1911         // kill 1 follower and verify if that got killed
1912         final JavaTestKit probe = new JavaTestKit(getSystem());
1913         probe.watch(followerActor1);
1914         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1915         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1916         assertEquals(termMsg1.getActor(), followerActor1);
1917
1918         leader.markFollowerInActive("follower-1");
1919         leader.markFollowerActive("follower-2");
1920         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1921         assertTrue("Behavior not instance of Leader when majority of followers are active",
1922                 newBehavior instanceof Leader);
1923
1924         // kill 2nd follower and leader should change to Isolated leader
1925         followerActor2.tell(PoisonPill.getInstance(), null);
1926         probe.watch(followerActor2);
1927         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1928         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1929         assertEquals(termMsg2.getActor(), followerActor2);
1930
1931         leader.markFollowerInActive("follower-2");
1932         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1933     }
1934
1935     @Test
1936     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1937         logStart("testIsolatedLeaderCheckTwoFollowers");
1938
1939         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1940
1941         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1942             newBehavior instanceof IsolatedLeader);
1943     }
1944
1945     @Test
1946     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1947         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1948
1949         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1950
1951         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1952                 newBehavior instanceof Leader);
1953     }
1954
1955     @Test
1956     public void testLaggingFollowerStarvation() throws Exception {
1957         logStart("testLaggingFollowerStarvation");
1958
1959         String leaderActorId = actorFactory.generateActorId("leader");
1960         String follower1ActorId = actorFactory.generateActorId("follower");
1961         String follower2ActorId = actorFactory.generateActorId("follower");
1962
1963         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1964         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1965
1966         MockRaftActorContext leaderActorContext =
1967                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1968
1969         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1970         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1971         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1972
1973         leaderActorContext.setConfigParams(configParams);
1974
1975         leaderActorContext.setReplicatedLog(
1976                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1977
1978         Map<String, String> peerAddresses = new HashMap<>();
1979         peerAddresses.put(follower1ActorId,
1980                 follower1Actor.path().toString());
1981         peerAddresses.put(follower2ActorId,
1982                 follower2Actor.path().toString());
1983
1984         leaderActorContext.setPeerAddresses(peerAddresses);
1985         leaderActorContext.getTermInformation().update(1, leaderActorId);
1986
1987         leader = createBehavior(leaderActorContext);
1988
1989         leaderActor.underlyingActor().setBehavior(leader);
1990
1991         for (int i = 1; i < 6; i++) {
1992             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1993             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1994                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1995             assertTrue(newBehavior == leader);
1996             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1997         }
1998
1999         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2000         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2001
2002         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2003                 heartbeats.size() > 1);
2004
2005         // Check if follower-2 got AppendEntries during this time and was not starved
2006         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2007
2008         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2009                 appendEntries.size() > 1);
2010     }
2011
2012     @Test
2013     public void testReplicationConsensusWithNonVotingFollower() {
2014         logStart("testReplicationConsensusWithNonVotingFollower");
2015
2016         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2017         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2018                 new FiniteDuration(1000, TimeUnit.SECONDS));
2019
2020         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2021         leaderActorContext.setCommitIndex(-1);
2022         leaderActorContext.setLastApplied(-1);
2023
2024         String nonVotingFollowerId = "nonvoting-follower";
2025         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
2026                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2027
2028         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2029                 VotingState.NON_VOTING);
2030
2031         leader = new Leader(leaderActorContext);
2032         leaderActorContext.setCurrentBehavior(leader);
2033
2034         // Ignore initial heartbeats
2035         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2036         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2037
2038         MessageCollectorActor.clearMessages(followerActor);
2039         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2040         MessageCollectorActor.clearMessages(leaderActor);
2041
2042         // Send a Replicate message and wait for AppendEntries.
2043         sendReplicate(leaderActorContext, 0);
2044
2045         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2046         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2047
2048         // Send reply only from the voting follower and verify consensus via ApplyState.
2049         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2050
2051         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2052
2053         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2054
2055         MessageCollectorActor.clearMessages(followerActor);
2056         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2057         MessageCollectorActor.clearMessages(leaderActor);
2058
2059         // Send another Replicate message
2060         sendReplicate(leaderActorContext, 1);
2061
2062         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2063         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2064                 AppendEntries.class);
2065         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2066         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2067
2068         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2069         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2070
2071         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2072
2073         // Send reply from the voting follower and verify consensus.
2074         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2075
2076         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2077     }
2078
2079     @Test
2080     public void testTransferLeadershipWithFollowerInSync() {
2081         logStart("testTransferLeadershipWithFollowerInSync");
2082
2083         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2084         leaderActorContext.setLastApplied(-1);
2085         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2086                 new FiniteDuration(1000, TimeUnit.SECONDS));
2087         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2088
2089         leader = new Leader(leaderActorContext);
2090         leaderActorContext.setCurrentBehavior(leader);
2091
2092         // Initial heartbeat
2093         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2094         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2095         MessageCollectorActor.clearMessages(followerActor);
2096
2097         sendReplicate(leaderActorContext, 0);
2098         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099
2100         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2101         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2102         MessageCollectorActor.clearMessages(followerActor);
2103
2104         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2105         leader.transferLeadership(mockTransferCohort);
2106
2107         verify(mockTransferCohort, never()).transferComplete();
2108         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2109         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2111
2112         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2113         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2114
2115         // Leader should force an election timeout
2116         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2117
2118         verify(mockTransferCohort).transferComplete();
2119     }
2120
2121     @Test
2122     public void testTransferLeadershipWithEmptyLog() {
2123         logStart("testTransferLeadershipWithEmptyLog");
2124
2125         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2126         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2127                 new FiniteDuration(1000, TimeUnit.SECONDS));
2128         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2129
2130         leader = new Leader(leaderActorContext);
2131         leaderActorContext.setCurrentBehavior(leader);
2132
2133         // Initial heartbeat
2134         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2135         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2136         MessageCollectorActor.clearMessages(followerActor);
2137
2138         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2139         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2140         leader.transferLeadership(mockTransferCohort);
2141
2142         verify(mockTransferCohort, never()).transferComplete();
2143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2145
2146         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2147         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2148
2149         // Leader should force an election timeout
2150         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2151
2152         verify(mockTransferCohort).transferComplete();
2153     }
2154
2155     @Test
2156     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2157         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2158
2159         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2162
2163         leader = new Leader(leaderActorContext);
2164         leaderActorContext.setCurrentBehavior(leader);
2165
2166         // Initial heartbeat
2167         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2168         MessageCollectorActor.clearMessages(followerActor);
2169
2170         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2171         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2172         leader.transferLeadership(mockTransferCohort);
2173
2174         verify(mockTransferCohort, never()).transferComplete();
2175
2176         // Sync up the follower.
2177         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2179         MessageCollectorActor.clearMessages(followerActor);
2180
2181         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2182                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2183         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2184         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2185         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2186
2187         // Leader should force an election timeout
2188         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2189
2190         verify(mockTransferCohort).transferComplete();
2191     }
2192
2193     @Test
2194     public void testTransferLeadershipWithFollowerSyncTimeout() {
2195         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2196
2197         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2198         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2199                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2200         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2201         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2202
2203         leader = new Leader(leaderActorContext);
2204         leaderActorContext.setCurrentBehavior(leader);
2205
2206         // Initial heartbeat
2207         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2208         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2209         MessageCollectorActor.clearMessages(followerActor);
2210
2211         sendReplicate(leaderActorContext, 0);
2212         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2213
2214         MessageCollectorActor.clearMessages(followerActor);
2215
2216         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2217         leader.transferLeadership(mockTransferCohort);
2218
2219         verify(mockTransferCohort, never()).transferComplete();
2220
2221         // Send heartbeats to time out the transfer.
2222         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2223             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2224                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2225             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2226         }
2227
2228         verify(mockTransferCohort).abortTransfer();
2229         verify(mockTransferCohort, never()).transferComplete();
2230         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2231     }
2232
2233     @Override
2234     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2235             ActorRef actorRef, RaftRPC rpc) throws Exception {
2236         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2237         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2238     }
2239
2240     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2241
2242         private final long electionTimeOutIntervalMillis;
2243         private final int snapshotChunkSize;
2244
2245         MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2246             super();
2247             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2248             this.snapshotChunkSize = snapshotChunkSize;
2249         }
2250
2251         @Override
2252         public FiniteDuration getElectionTimeOutInterval() {
2253             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2254         }
2255
2256         @Override
2257         public int getSnapshotChunkSize() {
2258             return snapshotChunkSize;
2259         }
2260     }
2261 }