Checkstyle: fix ParenPad violations
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.google.protobuf.ByteString;
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.After;
39 import org.junit.Test;
40 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
41 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
42 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
43 import org.opendaylight.controller.cluster.raft.RaftActorContext;
44 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
45 import org.opendaylight.controller.cluster.raft.RaftState;
46 import org.opendaylight.controller.cluster.raft.RaftVersions;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
48 import org.opendaylight.controller.cluster.raft.VotingState;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
56 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
63 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
65 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
66 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
67 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
68 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
69 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
70 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
71 import org.opendaylight.yangtools.concepts.Identifier;
72 import scala.concurrent.duration.FiniteDuration;
73
74 public class LeaderTest extends AbstractLeaderTest<Leader> {
75
76     static final String FOLLOWER_ID = "follower";
77     public static final String LEADER_ID = "leader";
78
79     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
80             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
81
82     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
83             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
84
85     private Leader leader;
86     private final short payloadVersion = 5;
87
88     @Override
89     @After
90     public void tearDown() throws Exception {
91         if (leader != null) {
92             leader.close();
93         }
94
95         super.tearDown();
96     }
97
98     @Test
99     public void testHandleMessageForUnknownMessage() throws Exception {
100         logStart("testHandleMessageForUnknownMessage");
101
102         leader = new Leader(createActorContext());
103
104         // handle message should null when it receives an unknown message
105         assertNull(leader.handleMessage(followerActor, "foo"));
106     }
107
108     @Test
109     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
110         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
111
112         MockRaftActorContext actorContext = createActorContextWithFollower();
113         actorContext.setCommitIndex(-1);
114         actorContext.setPayloadVersion(payloadVersion);
115
116         long term = 1;
117         actorContext.getTermInformation().update(term, "");
118
119         leader = new Leader(actorContext);
120         actorContext.setCurrentBehavior(leader);
121
122         // Leader should send an immediate heartbeat with no entries as follower is inactive.
123         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
124         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
125         assertEquals("getTerm", term, appendEntries.getTerm());
126         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
127         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
128         assertEquals("Entries size", 0, appendEntries.getEntries().size());
129         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
130
131         // The follower would normally reply - simulate that explicitly here.
132         leader.handleMessage(followerActor, new AppendEntriesReply(
133                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
134         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
135
136         followerActor.underlyingActor().clear();
137
138         // Sleep for the heartbeat interval so AppendEntries is sent.
139         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
140                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
141
142         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
143
144         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
146         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
147         assertEquals("Entries size", 1, appendEntries.getEntries().size());
148         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
149         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
150         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
151     }
152
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
155         return sendReplicate(actorContext, 1, index);
156     }
157
158     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
159         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
160         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
161         actorContext.getReplicatedLog().append(newEntry);
162         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
163     }
164
165     @Test
166     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
167         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
168
169         MockRaftActorContext actorContext = createActorContextWithFollower();
170
171         long term = 1;
172         actorContext.getTermInformation().update(term, "");
173
174         leader = new Leader(actorContext);
175
176         // Leader will send an immediate heartbeat - ignore it.
177         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
178
179         // The follower would normally reply - simulate that explicitly here.
180         long lastIndex = actorContext.getReplicatedLog().lastIndex();
181         leader.handleMessage(followerActor, new AppendEntriesReply(
182                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
183         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
184
185         followerActor.underlyingActor().clear();
186
187         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
188
189         // State should not change
190         assertTrue(raftBehavior instanceof Leader);
191
192         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
193         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
194         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
195         assertEquals("Entries size", 1, appendEntries.getEntries().size());
196         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
197         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
198         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
199         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
200     }
201
202     @Test
203     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
204         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
205
206         MockRaftActorContext actorContext = createActorContextWithFollower();
207         actorContext.setCommitIndex(-1);
208         actorContext.setLastApplied(-1);
209
210         // The raft context is initialized with a couple log entries. However the commitIndex
211         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
212         // committed and applied. Now it regains leadership with a higher term (2).
213         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
214         long newTerm = prevTerm + 1;
215         actorContext.getTermInformation().update(newTerm, "");
216
217         leader = new Leader(actorContext);
218         actorContext.setCurrentBehavior(leader);
219
220         // Leader will send an immediate heartbeat - ignore it.
221         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
222
223         // The follower replies with the leader's current last index and term, simulating that it is
224         // up to date with the leader.
225         long lastIndex = actorContext.getReplicatedLog().lastIndex();
226         leader.handleMessage(followerActor, new AppendEntriesReply(
227                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
228
229         // The commit index should not get updated even though consensus was reached. This is b/c the
230         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
231         // from previous terms by counting replicas".
232         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
233
234         followerActor.underlyingActor().clear();
235
236         // Now replicate a new entry with the new term 2.
237         long newIndex = lastIndex + 1;
238         sendReplicate(actorContext, newTerm, newIndex);
239
240         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
241         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
242         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
243         assertEquals("Entries size", 1, appendEntries.getEntries().size());
244         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
245         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
246         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
247
248         // The follower replies with success. The leader should now update the commit index to the new index
249         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
250         // prior entries are committed indirectly".
251         leader.handleMessage(followerActor, new AppendEntriesReply(
252                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
253
254         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
255     }
256
257     @Test
258     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
259         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
260
261         MockRaftActorContext actorContext = createActorContextWithFollower();
262         actorContext.setRaftPolicy(createRaftPolicy(true, true));
263
264         long term = 1;
265         actorContext.getTermInformation().update(term, "");
266
267         leader = new Leader(actorContext);
268
269         // Leader will send an immediate heartbeat - ignore it.
270         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
271
272         // The follower would normally reply - simulate that explicitly here.
273         long lastIndex = actorContext.getReplicatedLog().lastIndex();
274         leader.handleMessage(followerActor, new AppendEntriesReply(
275                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
276         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
277
278         followerActor.underlyingActor().clear();
279
280         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
281
282         // State should not change
283         assertTrue(raftBehavior instanceof Leader);
284
285         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
286         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
287         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
288         assertEquals("Entries size", 1, appendEntries.getEntries().size());
289         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
290         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
291         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
292         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
293     }
294
295     @Test
296     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
297         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
298
299         MockRaftActorContext actorContext = createActorContextWithFollower();
300         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
301             @Override
302             public FiniteDuration getHeartBeatInterval() {
303                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
304             }
305         });
306
307         long term = 1;
308         actorContext.getTermInformation().update(term, "");
309
310         leader = new Leader(actorContext);
311
312         // Leader will send an immediate heartbeat - ignore it.
313         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
314
315         // The follower would normally reply - simulate that explicitly here.
316         long lastIndex = actorContext.getReplicatedLog().lastIndex();
317         leader.handleMessage(followerActor, new AppendEntriesReply(
318                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
319         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
320
321         followerActor.underlyingActor().clear();
322
323         for (int i = 0; i < 5; i++) {
324             sendReplicate(actorContext, lastIndex + i + 1);
325         }
326
327         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
328         // We expect only 1 message to be sent because of two reasons,
329         // - an append entries reply was not received
330         // - the heartbeat interval has not expired
331         // In this scenario if multiple messages are sent they would likely be duplicates
332         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
333     }
334
335     @Test
336     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
337         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
338
339         MockRaftActorContext actorContext = createActorContextWithFollower();
340         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
341             @Override
342             public FiniteDuration getHeartBeatInterval() {
343                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
344             }
345         });
346
347         long term = 1;
348         actorContext.getTermInformation().update(term, "");
349
350         leader = new Leader(actorContext);
351
352         // Leader will send an immediate heartbeat - ignore it.
353         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
354
355         // The follower would normally reply - simulate that explicitly here.
356         long lastIndex = actorContext.getReplicatedLog().lastIndex();
357         leader.handleMessage(followerActor, new AppendEntriesReply(
358                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
359         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
360
361         followerActor.underlyingActor().clear();
362
363         for (int i = 0; i < 3; i++) {
364             sendReplicate(actorContext, lastIndex + i + 1);
365             leader.handleMessage(followerActor, new AppendEntriesReply(
366                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
367
368         }
369
370         for (int i = 3; i < 5; i++) {
371             sendReplicate(actorContext, lastIndex + i + 1);
372         }
373
374         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
375         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
376         // get sent to the follower - but not the 5th
377         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
378
379         for (int i = 0; i < 4; i++) {
380             long expected = allMessages.get(i).getEntries().get(0).getIndex();
381             assertEquals(expected, i + 2);
382         }
383     }
384
385     @Test
386     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
387         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
388
389         MockRaftActorContext actorContext = createActorContextWithFollower();
390         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
391             @Override
392             public FiniteDuration getHeartBeatInterval() {
393                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
394             }
395         });
396
397         long term = 1;
398         actorContext.getTermInformation().update(term, "");
399
400         leader = new Leader(actorContext);
401
402         // Leader will send an immediate heartbeat - ignore it.
403         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
404
405         // The follower would normally reply - simulate that explicitly here.
406         long lastIndex = actorContext.getReplicatedLog().lastIndex();
407         leader.handleMessage(followerActor, new AppendEntriesReply(
408                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
409         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
410
411         followerActor.underlyingActor().clear();
412
413         sendReplicate(actorContext, lastIndex + 1);
414
415         // Wait slightly longer than heartbeat duration
416         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
417
418         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
419
420         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
421         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
422
423         assertEquals(1, allMessages.get(0).getEntries().size());
424         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
425         assertEquals(1, allMessages.get(1).getEntries().size());
426         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
427
428     }
429
430     @Test
431     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
432         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
433
434         MockRaftActorContext actorContext = createActorContextWithFollower();
435         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
436             @Override
437             public FiniteDuration getHeartBeatInterval() {
438                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
439             }
440         });
441
442         long term = 1;
443         actorContext.getTermInformation().update(term, "");
444
445         leader = new Leader(actorContext);
446
447         // Leader will send an immediate heartbeat - ignore it.
448         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
449
450         // The follower would normally reply - simulate that explicitly here.
451         long lastIndex = actorContext.getReplicatedLog().lastIndex();
452         leader.handleMessage(followerActor, new AppendEntriesReply(
453                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
454         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
455
456         followerActor.underlyingActor().clear();
457
458         for (int i = 0; i < 3; i++) {
459             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
460             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
461         }
462
463         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
464         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
465     }
466
467     @Test
468     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
469         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
470
471         MockRaftActorContext actorContext = createActorContextWithFollower();
472         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
473             @Override
474             public FiniteDuration getHeartBeatInterval() {
475                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
476             }
477         });
478
479         long term = 1;
480         actorContext.getTermInformation().update(term, "");
481
482         leader = new Leader(actorContext);
483
484         // Leader will send an immediate heartbeat - ignore it.
485         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
486
487         // The follower would normally reply - simulate that explicitly here.
488         long lastIndex = actorContext.getReplicatedLog().lastIndex();
489         leader.handleMessage(followerActor, new AppendEntriesReply(
490                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
491         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
492
493         followerActor.underlyingActor().clear();
494
495         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
496         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
497         sendReplicate(actorContext, lastIndex + 1);
498
499         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
500         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
501
502         assertEquals(0, allMessages.get(0).getEntries().size());
503         assertEquals(1, allMessages.get(1).getEntries().size());
504     }
505
506
507     @Test
508     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
509         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
510
511         MockRaftActorContext actorContext = createActorContext();
512
513         leader = new Leader(actorContext);
514
515         actorContext.setLastApplied(0);
516
517         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
518         long term = actorContext.getTermInformation().getCurrentTerm();
519         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
520                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
521
522         actorContext.getReplicatedLog().append(newEntry);
523
524         final Identifier id = new MockIdentifier("state-id");
525         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
526                 new Replicate(leaderActor, id, newEntry, true));
527
528         // State should not change
529         assertTrue(raftBehavior instanceof Leader);
530
531         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
532
533         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
534         // one since lastApplied state is 0.
535         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
536                 leaderActor, ApplyState.class);
537         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
538
539         for (int i = 0; i <= newLogIndex - 1; i++) {
540             ApplyState applyState = applyStateList.get(i);
541             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
542             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
543         }
544
545         ApplyState last = applyStateList.get((int) newLogIndex - 1);
546         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
547         assertEquals("getIdentifier", id, last.getIdentifier());
548     }
549
550     @Test
551     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
552         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
553
554         final MockRaftActorContext actorContext = createActorContextWithFollower();
555
556         Map<String, String> leadersSnapshot = new HashMap<>();
557         leadersSnapshot.put("1", "A");
558         leadersSnapshot.put("2", "B");
559         leadersSnapshot.put("3", "C");
560
561         //clears leaders log
562         actorContext.getReplicatedLog().removeFrom(0);
563
564         final int commitIndex = 3;
565         final int snapshotIndex = 2;
566         final int snapshotTerm = 1;
567
568         // set the snapshot variables in replicatedlog
569         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
570         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
571         actorContext.setCommitIndex(commitIndex);
572         //set follower timeout to 2 mins, helps during debugging
573         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
574
575         leader = new Leader(actorContext);
576
577         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
578         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
579
580         //update follower timestamp
581         leader.markFollowerActive(FOLLOWER_ID);
582
583         ByteString bs = toByteString(leadersSnapshot);
584         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
585                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
586                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
587         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
588                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
589         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
590         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
591
592         //send first chunk and no InstallSnapshotReply received yet
593         fts.getNextChunk();
594         fts.incrementChunkIndex();
595
596         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
597                 TimeUnit.MILLISECONDS);
598
599         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
600
601         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
602
603         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
604
605         //InstallSnapshotReply received
606         fts.markSendStatus(true);
607
608         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
609
610         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
611
612         assertEquals(commitIndex, is.getLastIncludedIndex());
613     }
614
615     @Test
616     public void testSendAppendEntriesSnapshotScenario() throws Exception {
617         logStart("testSendAppendEntriesSnapshotScenario");
618
619         final MockRaftActorContext actorContext = createActorContextWithFollower();
620
621         Map<String, String> leadersSnapshot = new HashMap<>();
622         leadersSnapshot.put("1", "A");
623         leadersSnapshot.put("2", "B");
624         leadersSnapshot.put("3", "C");
625
626         //clears leaders log
627         actorContext.getReplicatedLog().removeFrom(0);
628
629         final int followersLastIndex = 2;
630         final int snapshotIndex = 3;
631         final int newEntryIndex = 4;
632         final int snapshotTerm = 1;
633         final int currentTerm = 2;
634
635         // set the snapshot variables in replicatedlog
636         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
637         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
638         actorContext.setCommitIndex(followersLastIndex);
639
640         leader = new Leader(actorContext);
641
642         // Leader will send an immediate heartbeat - ignore it.
643         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
644
645         // new entry
646         SimpleReplicatedLogEntry entry =
647                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
648                         new MockRaftActorContext.MockPayload("D"));
649
650         actorContext.getReplicatedLog().append(entry);
651
652         //update follower timestamp
653         leader.markFollowerActive(FOLLOWER_ID);
654
655         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
656         RaftActorBehavior raftBehavior = leader.handleMessage(
657                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
658
659         assertTrue(raftBehavior instanceof Leader);
660
661         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
662     }
663
664     @Test
665     public void testInitiateInstallSnapshot() throws Exception {
666         logStart("testInitiateInstallSnapshot");
667
668         MockRaftActorContext actorContext = createActorContextWithFollower();
669
670         //clears leaders log
671         actorContext.getReplicatedLog().removeFrom(0);
672
673         final int followersLastIndex = 2;
674         final int snapshotIndex = 3;
675         final int newEntryIndex = 4;
676         final int snapshotTerm = 1;
677         final int currentTerm = 2;
678
679         // set the snapshot variables in replicatedlog
680         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
681         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
682         actorContext.setLastApplied(3);
683         actorContext.setCommitIndex(followersLastIndex);
684
685         leader = new Leader(actorContext);
686
687         // Leader will send an immediate heartbeat - ignore it.
688         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689
690         // set the snapshot as absent and check if capture-snapshot is invoked.
691         leader.setSnapshot(null);
692
693         // new entry
694         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
695                 new MockRaftActorContext.MockPayload("D"));
696
697         actorContext.getReplicatedLog().append(entry);
698
699         //update follower timestamp
700         leader.markFollowerActive(FOLLOWER_ID);
701
702         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
703
704         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
705
706         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
707
708         assertEquals(3, cs.getLastAppliedIndex());
709         assertEquals(1, cs.getLastAppliedTerm());
710         assertEquals(4, cs.getLastIndex());
711         assertEquals(2, cs.getLastTerm());
712
713         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
714         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
715
716         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
717     }
718
719     @Test
720     public void testInitiateForceInstallSnapshot() throws Exception {
721         logStart("testInitiateForceInstallSnapshot");
722
723         MockRaftActorContext actorContext = createActorContextWithFollower();
724
725         final int followersLastIndex = 2;
726         final int snapshotIndex = -1;
727         final int newEntryIndex = 4;
728         final int snapshotTerm = -1;
729         final int currentTerm = 2;
730
731         // set the snapshot variables in replicatedlog
732         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734         actorContext.setLastApplied(3);
735         actorContext.setCommitIndex(followersLastIndex);
736
737         actorContext.getReplicatedLog().removeFrom(0);
738
739         leader = new Leader(actorContext);
740         actorContext.setCurrentBehavior(leader);
741
742         // Leader will send an immediate heartbeat - ignore it.
743         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
744
745         // set the snapshot as absent and check if capture-snapshot is invoked.
746         leader.setSnapshot(null);
747
748         for (int i = 0; i < 4; i++) {
749             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
750                     new MockRaftActorContext.MockPayload("X" + i)));
751         }
752
753         // new entry
754         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
755                 new MockRaftActorContext.MockPayload("D"));
756
757         actorContext.getReplicatedLog().append(entry);
758
759         //update follower timestamp
760         leader.markFollowerActive(FOLLOWER_ID);
761
762         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
763         // installed with a SendInstallSnapshot
764         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
765
766         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
767
768         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
769
770         assertEquals(3, cs.getLastAppliedIndex());
771         assertEquals(1, cs.getLastAppliedTerm());
772         assertEquals(4, cs.getLastIndex());
773         assertEquals(2, cs.getLastTerm());
774
775         // if an initiate is started again when first is in progress, it should not initiate Capture
776         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
777
778         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
779     }
780
781
782     @Test
783     public void testInstallSnapshot() throws Exception {
784         logStart("testInstallSnapshot");
785
786         final MockRaftActorContext actorContext = createActorContextWithFollower();
787
788         Map<String, String> leadersSnapshot = new HashMap<>();
789         leadersSnapshot.put("1", "A");
790         leadersSnapshot.put("2", "B");
791         leadersSnapshot.put("3", "C");
792
793         //clears leaders log
794         actorContext.getReplicatedLog().removeFrom(0);
795
796         final int lastAppliedIndex = 3;
797         final int snapshotIndex = 2;
798         final int snapshotTerm = 1;
799         final int currentTerm = 2;
800
801         // set the snapshot variables in replicatedlog
802         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805         actorContext.setCommitIndex(lastAppliedIndex);
806         actorContext.setLastApplied(lastAppliedIndex);
807
808         leader = new Leader(actorContext);
809
810         // Initial heartbeat.
811         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812
813         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
814         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
815
816         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
817         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
818                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
819
820         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
821                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
822
823         assertTrue(raftBehavior instanceof Leader);
824
825         // check if installsnapshot gets called with the correct values.
826
827         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
828                 InstallSnapshot.class);
829
830         assertNotNull(installSnapshot.getData());
831         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
832         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
833
834         assertEquals(currentTerm, installSnapshot.getTerm());
835     }
836
837     @Test
838     public void testForceInstallSnapshot() throws Exception {
839         logStart("testForceInstallSnapshot");
840
841         final MockRaftActorContext actorContext = createActorContextWithFollower();
842
843         Map<String, String> leadersSnapshot = new HashMap<>();
844         leadersSnapshot.put("1", "A");
845         leadersSnapshot.put("2", "B");
846         leadersSnapshot.put("3", "C");
847
848         final int lastAppliedIndex = 3;
849         final int snapshotIndex = -1;
850         final int snapshotTerm = -1;
851         final int currentTerm = 2;
852
853         // set the snapshot variables in replicatedlog
854         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
855         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
856         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
857         actorContext.setCommitIndex(lastAppliedIndex);
858         actorContext.setLastApplied(lastAppliedIndex);
859
860         leader = new Leader(actorContext);
861
862         // Initial heartbeat.
863         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864
865         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
866         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
867
868         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
869         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
870                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
871
872         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
873                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
874
875         assertTrue(raftBehavior instanceof Leader);
876
877         // check if installsnapshot gets called with the correct values.
878
879         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
880                 InstallSnapshot.class);
881
882         assertNotNull(installSnapshot.getData());
883         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
884         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
885
886         assertEquals(currentTerm, installSnapshot.getTerm());
887     }
888
889     @Test
890     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
891         logStart("testHandleInstallSnapshotReplyLastChunk");
892
893         MockRaftActorContext actorContext = createActorContextWithFollower();
894
895         final int commitIndex = 3;
896         final int snapshotIndex = 2;
897         final int snapshotTerm = 1;
898         final int currentTerm = 2;
899
900         actorContext.setCommitIndex(commitIndex);
901
902         leader = new Leader(actorContext);
903         actorContext.setCurrentBehavior(leader);
904
905         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
906         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
907
908         // Ignore initial heartbeat.
909         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
910
911         Map<String, String> leadersSnapshot = new HashMap<>();
912         leadersSnapshot.put("1", "A");
913         leadersSnapshot.put("2", "B");
914         leadersSnapshot.put("3", "C");
915
916         // set the snapshot variables in replicatedlog
917
918         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
919         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
920         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
921
922         ByteString bs = toByteString(leadersSnapshot);
923         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
924                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
925                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
926         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
927                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
928         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
929         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
930         while (!fts.isLastChunk(fts.getChunkIndex())) {
931             fts.getNextChunk();
932             fts.incrementChunkIndex();
933         }
934
935         //clears leaders log
936         actorContext.getReplicatedLog().removeFrom(0);
937
938         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
939                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
940
941         assertTrue(raftBehavior instanceof Leader);
942
943         assertEquals(1, leader.followerLogSize());
944         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
945         assertNotNull(fli);
946         assertNull(fli.getInstallSnapshotState());
947         assertEquals(commitIndex, fli.getMatchIndex());
948         assertEquals(commitIndex + 1, fli.getNextIndex());
949         assertFalse(leader.hasSnapshot());
950     }
951
952     @Test
953     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
954         logStart("testSendSnapshotfromInstallSnapshotReply");
955
956         MockRaftActorContext actorContext = createActorContextWithFollower();
957
958         final int commitIndex = 3;
959         final int snapshotIndex = 2;
960         final int snapshotTerm = 1;
961         final int currentTerm = 2;
962
963         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
964             @Override
965             public int getSnapshotChunkSize() {
966                 return 50;
967             }
968         };
969         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
970         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
971
972         actorContext.setConfigParams(configParams);
973         actorContext.setCommitIndex(commitIndex);
974
975         leader = new Leader(actorContext);
976         actorContext.setCurrentBehavior(leader);
977
978         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
979         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
980
981         Map<String, String> leadersSnapshot = new HashMap<>();
982         leadersSnapshot.put("1", "A");
983         leadersSnapshot.put("2", "B");
984         leadersSnapshot.put("3", "C");
985
986         // set the snapshot variables in replicatedlog
987         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
988         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
989         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
990
991         ByteString bs = toByteString(leadersSnapshot);
992         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
993                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
994                 -1, null, null);
995
996         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
997
998         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
999                 InstallSnapshot.class);
1000
1001         assertEquals(1, installSnapshot.getChunkIndex());
1002         assertEquals(3, installSnapshot.getTotalChunks());
1003
1004         followerActor.underlyingActor().clear();
1005         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1006                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1007
1008         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1009
1010         assertEquals(2, installSnapshot.getChunkIndex());
1011         assertEquals(3, installSnapshot.getTotalChunks());
1012
1013         followerActor.underlyingActor().clear();
1014         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1015                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1016
1017         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1018
1019         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1020         followerActor.underlyingActor().clear();
1021         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1022                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1023
1024         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1025
1026         assertNull(installSnapshot);
1027     }
1028
1029
1030     @Test
1031     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1032         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1033
1034         MockRaftActorContext actorContext = createActorContextWithFollower();
1035
1036         final int commitIndex = 3;
1037         final int snapshotIndex = 2;
1038         final int snapshotTerm = 1;
1039         final int currentTerm = 2;
1040
1041         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1042             @Override
1043             public int getSnapshotChunkSize() {
1044                 return 50;
1045             }
1046         });
1047
1048         actorContext.setCommitIndex(commitIndex);
1049
1050         leader = new Leader(actorContext);
1051
1052         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1053         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1054
1055         Map<String, String> leadersSnapshot = new HashMap<>();
1056         leadersSnapshot.put("1", "A");
1057         leadersSnapshot.put("2", "B");
1058         leadersSnapshot.put("3", "C");
1059
1060         // set the snapshot variables in replicatedlog
1061         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1062         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1063         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1064
1065         ByteString bs = toByteString(leadersSnapshot);
1066         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1067                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1068                 -1, null, null);
1069
1070         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1071         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1072
1073         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1074                 InstallSnapshot.class);
1075
1076         assertEquals(1, installSnapshot.getChunkIndex());
1077         assertEquals(3, installSnapshot.getTotalChunks());
1078
1079         followerActor.underlyingActor().clear();
1080
1081         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1082                 FOLLOWER_ID, -1, false));
1083
1084         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1085                 TimeUnit.MILLISECONDS);
1086
1087         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1088
1089         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1090
1091         assertEquals(1, installSnapshot.getChunkIndex());
1092         assertEquals(3, installSnapshot.getTotalChunks());
1093     }
1094
1095     @Test
1096     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1097         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1098
1099         MockRaftActorContext actorContext = createActorContextWithFollower();
1100
1101         final int commitIndex = 3;
1102         final int snapshotIndex = 2;
1103         final int snapshotTerm = 1;
1104         final int currentTerm = 2;
1105
1106         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1107             @Override
1108             public int getSnapshotChunkSize() {
1109                 return 50;
1110             }
1111         });
1112
1113         actorContext.setCommitIndex(commitIndex);
1114
1115         leader = new Leader(actorContext);
1116
1117         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1118         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1119
1120         Map<String, String> leadersSnapshot = new HashMap<>();
1121         leadersSnapshot.put("1", "A");
1122         leadersSnapshot.put("2", "B");
1123         leadersSnapshot.put("3", "C");
1124
1125         // set the snapshot variables in replicatedlog
1126         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1127         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1128         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1129
1130         ByteString bs = toByteString(leadersSnapshot);
1131         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1132                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1133                 -1, null, null);
1134
1135         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1136
1137         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1138                 InstallSnapshot.class);
1139
1140         assertEquals(1, installSnapshot.getChunkIndex());
1141         assertEquals(3, installSnapshot.getTotalChunks());
1142         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1143                 installSnapshot.getLastChunkHashCode().get().intValue());
1144
1145         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1146
1147         followerActor.underlyingActor().clear();
1148
1149         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1150                 FOLLOWER_ID, 1, true));
1151
1152         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1153
1154         assertEquals(2, installSnapshot.getChunkIndex());
1155         assertEquals(3, installSnapshot.getTotalChunks());
1156         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1157     }
1158
1159     @Test
1160     public void testLeaderInstallSnapshotState() throws IOException {
1161         logStart("testLeaderInstallSnapshotState");
1162
1163         Map<String, String> leadersSnapshot = new HashMap<>();
1164         leadersSnapshot.put("1", "A");
1165         leadersSnapshot.put("2", "B");
1166         leadersSnapshot.put("3", "C");
1167
1168         ByteString bs = toByteString(leadersSnapshot);
1169         byte[] barray = bs.toByteArray();
1170
1171         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1172         fts.setSnapshotBytes(ByteSource.wrap(barray));
1173
1174         assertEquals(bs.size(), barray.length);
1175
1176         int chunkIndex = 0;
1177         for (int i = 0; i < barray.length; i = i + 50) {
1178             int length = i + 50;
1179             chunkIndex++;
1180
1181             if (i + 50 > barray.length) {
1182                 length = barray.length;
1183             }
1184
1185             byte[] chunk = fts.getNextChunk();
1186             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1187             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1188
1189             fts.markSendStatus(true);
1190             if (!fts.isLastChunk(chunkIndex)) {
1191                 fts.incrementChunkIndex();
1192             }
1193         }
1194
1195         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1196         fts.close();
1197     }
1198
1199     @Override
1200     protected Leader createBehavior(final RaftActorContext actorContext) {
1201         return new Leader(actorContext);
1202     }
1203
1204     @Override
1205     protected MockRaftActorContext createActorContext() {
1206         return createActorContext(leaderActor);
1207     }
1208
1209     @Override
1210     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1211         return createActorContext(LEADER_ID, actorRef);
1212     }
1213
1214     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1215         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1216         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1217         configParams.setElectionTimeoutFactor(100000);
1218         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1219         context.setConfigParams(configParams);
1220         context.setPayloadVersion(payloadVersion);
1221         return context;
1222     }
1223
1224     private MockRaftActorContext createActorContextWithFollower() {
1225         MockRaftActorContext actorContext = createActorContext();
1226         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1227                 followerActor.path().toString()).build());
1228         return actorContext;
1229     }
1230
1231     private MockRaftActorContext createFollowerActorContextWithLeader() {
1232         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1233         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1234         followerConfig.setElectionTimeoutFactor(10000);
1235         followerActorContext.setConfigParams(followerConfig);
1236         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1237         return followerActorContext;
1238     }
1239
1240     @Test
1241     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1242         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1243
1244         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1245
1246         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1247
1248         Follower follower = new Follower(followerActorContext);
1249         followerActor.underlyingActor().setBehavior(follower);
1250         followerActorContext.setCurrentBehavior(follower);
1251
1252         Map<String, String> peerAddresses = new HashMap<>();
1253         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1254
1255         leaderActorContext.setPeerAddresses(peerAddresses);
1256
1257         leaderActorContext.getReplicatedLog().removeFrom(0);
1258
1259         //create 3 entries
1260         leaderActorContext.setReplicatedLog(
1261                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1262
1263         leaderActorContext.setCommitIndex(1);
1264
1265         followerActorContext.getReplicatedLog().removeFrom(0);
1266
1267         // follower too has the exact same log entries and has the same commit index
1268         followerActorContext.setReplicatedLog(
1269                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1270
1271         followerActorContext.setCommitIndex(1);
1272
1273         leader = new Leader(leaderActorContext);
1274         leaderActorContext.setCurrentBehavior(leader);
1275
1276         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1277
1278         assertEquals(-1, appendEntries.getLeaderCommit());
1279         assertEquals(0, appendEntries.getEntries().size());
1280         assertEquals(0, appendEntries.getPrevLogIndex());
1281
1282         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1283                 leaderActor, AppendEntriesReply.class);
1284
1285         assertEquals(2, appendEntriesReply.getLogLastIndex());
1286         assertEquals(1, appendEntriesReply.getLogLastTerm());
1287
1288         // follower returns its next index
1289         assertEquals(2, appendEntriesReply.getLogLastIndex());
1290         assertEquals(1, appendEntriesReply.getLogLastTerm());
1291
1292         follower.close();
1293     }
1294
1295     @Test
1296     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1297         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1298
1299         final MockRaftActorContext leaderActorContext = createActorContext();
1300
1301         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1302         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1303
1304         Follower follower = new Follower(followerActorContext);
1305         followerActor.underlyingActor().setBehavior(follower);
1306         followerActorContext.setCurrentBehavior(follower);
1307
1308         Map<String, String> leaderPeerAddresses = new HashMap<>();
1309         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1310
1311         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1312
1313         leaderActorContext.getReplicatedLog().removeFrom(0);
1314
1315         leaderActorContext.setReplicatedLog(
1316                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1317
1318         leaderActorContext.setCommitIndex(1);
1319
1320         followerActorContext.getReplicatedLog().removeFrom(0);
1321
1322         followerActorContext.setReplicatedLog(
1323                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1324
1325         // follower has the same log entries but its commit index > leaders commit index
1326         followerActorContext.setCommitIndex(2);
1327
1328         leader = new Leader(leaderActorContext);
1329
1330         // Initial heartbeat
1331         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1332
1333         assertEquals(-1, appendEntries.getLeaderCommit());
1334         assertEquals(0, appendEntries.getEntries().size());
1335         assertEquals(0, appendEntries.getPrevLogIndex());
1336
1337         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1338                 leaderActor, AppendEntriesReply.class);
1339
1340         assertEquals(2, appendEntriesReply.getLogLastIndex());
1341         assertEquals(1, appendEntriesReply.getLogLastTerm());
1342
1343         leaderActor.underlyingActor().setBehavior(follower);
1344         leader.handleMessage(followerActor, appendEntriesReply);
1345
1346         leaderActor.underlyingActor().clear();
1347         followerActor.underlyingActor().clear();
1348
1349         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1350                 TimeUnit.MILLISECONDS);
1351
1352         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1353
1354         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1355
1356         assertEquals(2, appendEntries.getLeaderCommit());
1357         assertEquals(0, appendEntries.getEntries().size());
1358         assertEquals(2, appendEntries.getPrevLogIndex());
1359
1360         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1361
1362         assertEquals(2, appendEntriesReply.getLogLastIndex());
1363         assertEquals(1, appendEntriesReply.getLogLastTerm());
1364
1365         assertEquals(2, followerActorContext.getCommitIndex());
1366
1367         follower.close();
1368     }
1369
1370     @Test
1371     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1372         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1373
1374         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1375         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1376                 new FiniteDuration(1000, TimeUnit.SECONDS));
1377
1378         leaderActorContext.setReplicatedLog(
1379                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1380         long leaderCommitIndex = 2;
1381         leaderActorContext.setCommitIndex(leaderCommitIndex);
1382         leaderActorContext.setLastApplied(leaderCommitIndex);
1383
1384         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1385         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1386
1387         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1388
1389         followerActorContext.setReplicatedLog(
1390                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1391         followerActorContext.setCommitIndex(0);
1392         followerActorContext.setLastApplied(0);
1393
1394         Follower follower = new Follower(followerActorContext);
1395         followerActor.underlyingActor().setBehavior(follower);
1396
1397         leader = new Leader(leaderActorContext);
1398
1399         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1400         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1401                 AppendEntriesReply.class);
1402
1403         MessageCollectorActor.clearMessages(followerActor);
1404         MessageCollectorActor.clearMessages(leaderActor);
1405
1406         // Verify initial AppendEntries sent.
1407         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1408         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1409         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1410
1411         leaderActor.underlyingActor().setBehavior(leader);
1412
1413         leader.handleMessage(followerActor, appendEntriesReply);
1414
1415         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1416         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1417
1418         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1420         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1421
1422         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1423         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1424                 appendEntries.getEntries().get(0).getData());
1425         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1426         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1427                 appendEntries.getEntries().get(1).getData());
1428
1429         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1430         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1431
1432         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1433
1434         ApplyState applyState = applyStateList.get(0);
1435         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1436         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1438                 applyState.getReplicatedLogEntry().getData());
1439
1440         applyState = applyStateList.get(1);
1441         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1442         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1444                 applyState.getReplicatedLogEntry().getData());
1445
1446         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1447         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1448     }
1449
1450     @Test
1451     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1452         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1453
1454         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1455         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1456                 new FiniteDuration(1000, TimeUnit.SECONDS));
1457
1458         leaderActorContext.setReplicatedLog(
1459                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1460         long leaderCommitIndex = 1;
1461         leaderActorContext.setCommitIndex(leaderCommitIndex);
1462         leaderActorContext.setLastApplied(leaderCommitIndex);
1463
1464         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1465         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1466
1467         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1468
1469         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1470         followerActorContext.setCommitIndex(-1);
1471         followerActorContext.setLastApplied(-1);
1472
1473         Follower follower = new Follower(followerActorContext);
1474         followerActor.underlyingActor().setBehavior(follower);
1475         followerActorContext.setCurrentBehavior(follower);
1476
1477         leader = new Leader(leaderActorContext);
1478
1479         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1480         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1481                 AppendEntriesReply.class);
1482
1483         MessageCollectorActor.clearMessages(followerActor);
1484         MessageCollectorActor.clearMessages(leaderActor);
1485
1486         // Verify initial AppendEntries sent with the leader's current commit index.
1487         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1488         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1489         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1490
1491         leaderActor.underlyingActor().setBehavior(leader);
1492         leaderActorContext.setCurrentBehavior(leader);
1493
1494         leader.handleMessage(followerActor, appendEntriesReply);
1495
1496         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1497         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1498
1499         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1500         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1501         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1502
1503         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1504         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1505                 appendEntries.getEntries().get(0).getData());
1506         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1507         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1508                 appendEntries.getEntries().get(1).getData());
1509
1510         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1511         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1512
1513         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1514
1515         ApplyState applyState = applyStateList.get(0);
1516         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1517         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1518         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1519                 applyState.getReplicatedLogEntry().getData());
1520
1521         applyState = applyStateList.get(1);
1522         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1523         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1524         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1525                 applyState.getReplicatedLogEntry().getData());
1526
1527         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1528         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1529     }
1530
1531     @Test
1532     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1533         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1534
1535         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1536         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1537                 new FiniteDuration(1000, TimeUnit.SECONDS));
1538
1539         leaderActorContext.setReplicatedLog(
1540                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1541         long leaderCommitIndex = 1;
1542         leaderActorContext.setCommitIndex(leaderCommitIndex);
1543         leaderActorContext.setLastApplied(leaderCommitIndex);
1544
1545         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1546         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1547
1548         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1549
1550         followerActorContext.setReplicatedLog(
1551                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1552         followerActorContext.setCommitIndex(-1);
1553         followerActorContext.setLastApplied(-1);
1554
1555         Follower follower = new Follower(followerActorContext);
1556         followerActor.underlyingActor().setBehavior(follower);
1557         followerActorContext.setCurrentBehavior(follower);
1558
1559         leader = new Leader(leaderActorContext);
1560
1561         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1562         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1563                 AppendEntriesReply.class);
1564
1565         MessageCollectorActor.clearMessages(followerActor);
1566         MessageCollectorActor.clearMessages(leaderActor);
1567
1568         // Verify initial AppendEntries sent with the leader's current commit index.
1569         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1570         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1571         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1572
1573         leaderActor.underlyingActor().setBehavior(leader);
1574         leaderActorContext.setCurrentBehavior(leader);
1575
1576         leader.handleMessage(followerActor, appendEntriesReply);
1577
1578         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1579         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1580
1581         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1582         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1583         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1584
1585         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1586         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1587         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1588                 appendEntries.getEntries().get(0).getData());
1589         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1590         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1591         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1592                 appendEntries.getEntries().get(1).getData());
1593
1594         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1595         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1596
1597         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1598
1599         ApplyState applyState = applyStateList.get(0);
1600         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1601         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1602         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1603                 applyState.getReplicatedLogEntry().getData());
1604
1605         applyState = applyStateList.get(1);
1606         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1607         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1608         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1609                 applyState.getReplicatedLogEntry().getData());
1610
1611         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1612         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1613         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1614     }
1615
1616     @Test
1617     public void testHandleAppendEntriesReplyWithNewerTerm() {
1618         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1619
1620         MockRaftActorContext leaderActorContext = createActorContext();
1621         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1622                 new FiniteDuration(10000, TimeUnit.SECONDS));
1623
1624         leaderActorContext.setReplicatedLog(
1625                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1626
1627         leader = new Leader(leaderActorContext);
1628         leaderActor.underlyingActor().setBehavior(leader);
1629         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1630
1631         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1632                 AppendEntriesReply.class);
1633
1634         assertEquals(false, appendEntriesReply.isSuccess());
1635         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1636
1637         MessageCollectorActor.clearMessages(leaderActor);
1638     }
1639
1640     @Test
1641     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1642         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1643
1644         MockRaftActorContext leaderActorContext = createActorContext();
1645         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1646                 new FiniteDuration(10000, TimeUnit.SECONDS));
1647
1648         leaderActorContext.setReplicatedLog(
1649                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1650         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1651
1652         leader = new Leader(leaderActorContext);
1653         leaderActor.underlyingActor().setBehavior(leader);
1654         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1655
1656         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1657                 AppendEntriesReply.class);
1658
1659         assertEquals(false, appendEntriesReply.isSuccess());
1660         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1661
1662         MessageCollectorActor.clearMessages(leaderActor);
1663     }
1664
1665     @Test
1666     public void testHandleAppendEntriesReplySuccess() throws Exception {
1667         logStart("testHandleAppendEntriesReplySuccess");
1668
1669         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1670
1671         leaderActorContext.setReplicatedLog(
1672                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1673
1674         leaderActorContext.setCommitIndex(1);
1675         leaderActorContext.setLastApplied(1);
1676         leaderActorContext.getTermInformation().update(1, "leader");
1677
1678         leader = new Leader(leaderActorContext);
1679
1680         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1681
1682         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1683         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1684
1685         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1686
1687         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1688
1689         assertEquals(RaftState.Leader, raftActorBehavior.state());
1690
1691         assertEquals(2, leaderActorContext.getCommitIndex());
1692
1693         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1694                 leaderActor, ApplyJournalEntries.class);
1695
1696         assertEquals(2, leaderActorContext.getLastApplied());
1697
1698         assertEquals(2, applyJournalEntries.getToIndex());
1699
1700         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1701                 ApplyState.class);
1702
1703         assertEquals(1,applyStateList.size());
1704
1705         ApplyState applyState = applyStateList.get(0);
1706
1707         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1708
1709         assertEquals(2, followerInfo.getMatchIndex());
1710         assertEquals(3, followerInfo.getNextIndex());
1711         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1712         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1713     }
1714
1715     @Test
1716     public void testHandleAppendEntriesReplyUnknownFollower() {
1717         logStart("testHandleAppendEntriesReplyUnknownFollower");
1718
1719         MockRaftActorContext leaderActorContext = createActorContext();
1720
1721         leader = new Leader(leaderActorContext);
1722
1723         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1724
1725         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1726
1727         assertEquals(RaftState.Leader, raftActorBehavior.state());
1728     }
1729
1730     @Test
1731     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1732         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1733
1734         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1735         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1736                 new FiniteDuration(1000, TimeUnit.SECONDS));
1737         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1738
1739         leaderActorContext.setReplicatedLog(
1740                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1741         long leaderCommitIndex = 3;
1742         leaderActorContext.setCommitIndex(leaderCommitIndex);
1743         leaderActorContext.setLastApplied(leaderCommitIndex);
1744
1745         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1746         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1747         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1748         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1749
1750         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1751
1752         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1753         followerActorContext.setCommitIndex(-1);
1754         followerActorContext.setLastApplied(-1);
1755
1756         Follower follower = new Follower(followerActorContext);
1757         followerActor.underlyingActor().setBehavior(follower);
1758         followerActorContext.setCurrentBehavior(follower);
1759
1760         leader = new Leader(leaderActorContext);
1761
1762         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1763         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1764                 AppendEntriesReply.class);
1765
1766         MessageCollectorActor.clearMessages(followerActor);
1767         MessageCollectorActor.clearMessages(leaderActor);
1768
1769         // Verify initial AppendEntries sent with the leader's current commit index.
1770         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1771         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1772         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1773
1774         leaderActor.underlyingActor().setBehavior(leader);
1775         leaderActorContext.setCurrentBehavior(leader);
1776
1777         leader.handleMessage(followerActor, appendEntriesReply);
1778
1779         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1780                 AppendEntries.class, 2);
1781         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1782
1783         appendEntries = appendEntriesList.get(0);
1784         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1785         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1786         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1787
1788         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1789         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1790                 appendEntries.getEntries().get(0).getData());
1791         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1792         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1793                 appendEntries.getEntries().get(1).getData());
1794
1795         appendEntries = appendEntriesList.get(1);
1796         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1797         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1798         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1799
1800         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1801         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1802                 appendEntries.getEntries().get(0).getData());
1803         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1804         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1805                 appendEntries.getEntries().get(1).getData());
1806
1807         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1808         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1809
1810         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1811
1812         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1813         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1814     }
1815
1816     @Test
1817     public void testHandleRequestVoteReply() {
1818         logStart("testHandleRequestVoteReply");
1819
1820         MockRaftActorContext leaderActorContext = createActorContext();
1821
1822         leader = new Leader(leaderActorContext);
1823
1824         // Should be a no-op.
1825         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1826                 new RequestVoteReply(1, true));
1827
1828         assertEquals(RaftState.Leader, raftActorBehavior.state());
1829
1830         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1831
1832         assertEquals(RaftState.Leader, raftActorBehavior.state());
1833     }
1834
1835     @Test
1836     public void testIsolatedLeaderCheckNoFollowers() {
1837         logStart("testIsolatedLeaderCheckNoFollowers");
1838
1839         MockRaftActorContext leaderActorContext = createActorContext();
1840
1841         leader = new Leader(leaderActorContext);
1842         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1843         assertTrue(newBehavior instanceof Leader);
1844     }
1845
1846     @Test
1847     public void testIsolatedLeaderCheckNoVotingFollowers() {
1848         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1849
1850         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1851         Follower follower = new Follower(followerActorContext);
1852         followerActor.underlyingActor().setBehavior(follower);
1853
1854         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1855         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1856                 new FiniteDuration(1000, TimeUnit.SECONDS));
1857         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1858
1859         leader = new Leader(leaderActorContext);
1860         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1861         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1862         assertTrue("Expected Leader", newBehavior instanceof Leader);
1863     }
1864
1865     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1866         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1867         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1868
1869         MockRaftActorContext leaderActorContext = createActorContext();
1870
1871         Map<String, String> peerAddresses = new HashMap<>();
1872         peerAddresses.put("follower-1", followerActor1.path().toString());
1873         peerAddresses.put("follower-2", followerActor2.path().toString());
1874
1875         leaderActorContext.setPeerAddresses(peerAddresses);
1876         leaderActorContext.setRaftPolicy(raftPolicy);
1877
1878         leader = new Leader(leaderActorContext);
1879
1880         leader.markFollowerActive("follower-1");
1881         leader.markFollowerActive("follower-2");
1882         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1883         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1884
1885         // kill 1 follower and verify if that got killed
1886         final JavaTestKit probe = new JavaTestKit(getSystem());
1887         probe.watch(followerActor1);
1888         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1889         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1890         assertEquals(termMsg1.getActor(), followerActor1);
1891
1892         leader.markFollowerInActive("follower-1");
1893         leader.markFollowerActive("follower-2");
1894         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1895         assertTrue("Behavior not instance of Leader when majority of followers are active",
1896                 newBehavior instanceof Leader);
1897
1898         // kill 2nd follower and leader should change to Isolated leader
1899         followerActor2.tell(PoisonPill.getInstance(), null);
1900         probe.watch(followerActor2);
1901         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1902         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1903         assertEquals(termMsg2.getActor(), followerActor2);
1904
1905         leader.markFollowerInActive("follower-2");
1906         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1907     }
1908
1909     @Test
1910     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1911         logStart("testIsolatedLeaderCheckTwoFollowers");
1912
1913         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1914
1915         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1916             newBehavior instanceof IsolatedLeader);
1917     }
1918
1919     @Test
1920     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1921         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1922
1923         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1924
1925         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1926                 newBehavior instanceof Leader);
1927     }
1928
1929     @Test
1930     public void testLaggingFollowerStarvation() throws Exception {
1931         logStart("testLaggingFollowerStarvation");
1932
1933         String leaderActorId = actorFactory.generateActorId("leader");
1934         String follower1ActorId = actorFactory.generateActorId("follower");
1935         String follower2ActorId = actorFactory.generateActorId("follower");
1936
1937         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1938         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1939
1940         MockRaftActorContext leaderActorContext =
1941                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1942
1943         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1944         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1945         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1946
1947         leaderActorContext.setConfigParams(configParams);
1948
1949         leaderActorContext.setReplicatedLog(
1950                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1951
1952         Map<String, String> peerAddresses = new HashMap<>();
1953         peerAddresses.put(follower1ActorId,
1954                 follower1Actor.path().toString());
1955         peerAddresses.put(follower2ActorId,
1956                 follower2Actor.path().toString());
1957
1958         leaderActorContext.setPeerAddresses(peerAddresses);
1959         leaderActorContext.getTermInformation().update(1, leaderActorId);
1960
1961         leader = createBehavior(leaderActorContext);
1962
1963         leaderActor.underlyingActor().setBehavior(leader);
1964
1965         for (int i = 1; i < 6; i++) {
1966             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1967             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1968                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1969             assertTrue(newBehavior == leader);
1970             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1971         }
1972
1973         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1974         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1975
1976         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1977                 heartbeats.size() > 1);
1978
1979         // Check if follower-2 got AppendEntries during this time and was not starved
1980         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1981
1982         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1983                 appendEntries.size() > 1);
1984     }
1985
1986     @Test
1987     public void testReplicationConsensusWithNonVotingFollower() {
1988         logStart("testReplicationConsensusWithNonVotingFollower");
1989
1990         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1991         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1992                 new FiniteDuration(1000, TimeUnit.SECONDS));
1993
1994         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1995         leaderActorContext.setCommitIndex(-1);
1996         leaderActorContext.setLastApplied(-1);
1997
1998         String nonVotingFollowerId = "nonvoting-follower";
1999         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
2000                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2001
2002         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2003                 VotingState.NON_VOTING);
2004
2005         leader = new Leader(leaderActorContext);
2006         leaderActorContext.setCurrentBehavior(leader);
2007
2008         // Ignore initial heartbeats
2009         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2010         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2011
2012         MessageCollectorActor.clearMessages(followerActor);
2013         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2014         MessageCollectorActor.clearMessages(leaderActor);
2015
2016         // Send a Replicate message and wait for AppendEntries.
2017         sendReplicate(leaderActorContext, 0);
2018
2019         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2020         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2021
2022         // Send reply only from the voting follower and verify consensus via ApplyState.
2023         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2024
2025         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2026
2027         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2028
2029         MessageCollectorActor.clearMessages(followerActor);
2030         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2031         MessageCollectorActor.clearMessages(leaderActor);
2032
2033         // Send another Replicate message
2034         sendReplicate(leaderActorContext, 1);
2035
2036         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2037         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2038                 AppendEntries.class);
2039         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2040         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2041
2042         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2043         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2044
2045         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2046
2047         // Send reply from the voting follower and verify consensus.
2048         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2049
2050         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2051     }
2052
2053     @Test
2054     public void testTransferLeadershipWithFollowerInSync() {
2055         logStart("testTransferLeadershipWithFollowerInSync");
2056
2057         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2058         leaderActorContext.setLastApplied(-1);
2059         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2060                 new FiniteDuration(1000, TimeUnit.SECONDS));
2061         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2062
2063         leader = new Leader(leaderActorContext);
2064         leaderActorContext.setCurrentBehavior(leader);
2065
2066         // Initial heartbeat
2067         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2068         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2069         MessageCollectorActor.clearMessages(followerActor);
2070
2071         sendReplicate(leaderActorContext, 0);
2072         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073
2074         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2075         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2076         MessageCollectorActor.clearMessages(followerActor);
2077
2078         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2079         leader.transferLeadership(mockTransferCohort);
2080
2081         verify(mockTransferCohort, never()).transferComplete();
2082         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2083         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2084
2085         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2086         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2087
2088         // Leader should force an election timeout
2089         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2090
2091         verify(mockTransferCohort).transferComplete();
2092     }
2093
2094     @Test
2095     public void testTransferLeadershipWithEmptyLog() {
2096         logStart("testTransferLeadershipWithEmptyLog");
2097
2098         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2099         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2100                 new FiniteDuration(1000, TimeUnit.SECONDS));
2101         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2102
2103         leader = new Leader(leaderActorContext);
2104         leaderActorContext.setCurrentBehavior(leader);
2105
2106         // Initial heartbeat
2107         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2108         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2109         MessageCollectorActor.clearMessages(followerActor);
2110
2111         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2112         leader.transferLeadership(mockTransferCohort);
2113
2114         verify(mockTransferCohort, never()).transferComplete();
2115         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2116         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2117
2118         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2119         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2120
2121         // Leader should force an election timeout
2122         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2123
2124         verify(mockTransferCohort).transferComplete();
2125     }
2126
2127     @Test
2128     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2129         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2130
2131         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2132         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2133                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2134
2135         leader = new Leader(leaderActorContext);
2136         leaderActorContext.setCurrentBehavior(leader);
2137
2138         // Initial heartbeat
2139         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140         MessageCollectorActor.clearMessages(followerActor);
2141
2142         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2143         leader.transferLeadership(mockTransferCohort);
2144
2145         verify(mockTransferCohort, never()).transferComplete();
2146
2147         // Sync up the follower.
2148         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2150         MessageCollectorActor.clearMessages(followerActor);
2151
2152         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2153                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2154         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2155         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2156         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2157
2158         // Leader should force an election timeout
2159         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2160
2161         verify(mockTransferCohort).transferComplete();
2162     }
2163
2164     @Test
2165     public void testTransferLeadershipWithFollowerSyncTimeout() {
2166         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2167
2168         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2169         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2170                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2171         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2172         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2173
2174         leader = new Leader(leaderActorContext);
2175         leaderActorContext.setCurrentBehavior(leader);
2176
2177         // Initial heartbeat
2178         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2179         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2180         MessageCollectorActor.clearMessages(followerActor);
2181
2182         sendReplicate(leaderActorContext, 0);
2183         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2184
2185         MessageCollectorActor.clearMessages(followerActor);
2186
2187         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2188         leader.transferLeadership(mockTransferCohort);
2189
2190         verify(mockTransferCohort, never()).transferComplete();
2191
2192         // Send heartbeats to time out the transfer.
2193         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2194             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2195                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2196             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2197         }
2198
2199         verify(mockTransferCohort).abortTransfer();
2200         verify(mockTransferCohort, never()).transferComplete();
2201         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2202     }
2203
2204     @Override
2205     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2206             ActorRef actorRef, RaftRPC rpc) throws Exception {
2207         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2208         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2209     }
2210
2211     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2212
2213         private final long electionTimeOutIntervalMillis;
2214         private final int snapshotChunkSize;
2215
2216         MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2217             super();
2218             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2219             this.snapshotChunkSize = snapshotChunkSize;
2220         }
2221
2222         @Override
2223         public FiniteDuration getElectionTimeOutInterval() {
2224             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2225         }
2226
2227         @Override
2228         public int getSnapshotChunkSize() {
2229             return snapshotChunkSize;
2230         }
2231     }
2232 }