99b647b000f3851e1777396dc56e196430dba562
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.google.protobuf.ByteString;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37 import org.junit.After;
38 import org.junit.Test;
39 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
40 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
41 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
42 import org.opendaylight.controller.cluster.raft.RaftActorContext;
43 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
44 import org.opendaylight.controller.cluster.raft.RaftState;
45 import org.opendaylight.controller.cluster.raft.RaftVersions;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
63 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
64 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
65 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
66 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
67 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
68 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
69 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
70 import org.opendaylight.yangtools.concepts.Identifier;
71 import scala.concurrent.duration.FiniteDuration;
72
73 public class LeaderTest extends AbstractLeaderTest<Leader> {
74
75     static final String FOLLOWER_ID = "follower";
76     public static final String LEADER_ID = "leader";
77
78     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
79             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
80
81     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
82             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
83
84     private Leader leader;
85     private final short payloadVersion = 5;
86
87     @Override
88     @After
89     public void tearDown() throws Exception {
90         if (leader != null) {
91             leader.close();
92         }
93
94         super.tearDown();
95     }
96
97     @Test
98     public void testHandleMessageForUnknownMessage() throws Exception {
99         logStart("testHandleMessageForUnknownMessage");
100
101         leader = new Leader(createActorContext());
102
103         // handle message should null when it receives an unknown message
104         assertNull(leader.handleMessage(followerActor, "foo"));
105     }
106
107     @Test
108     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
109         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
110
111         MockRaftActorContext actorContext = createActorContextWithFollower();
112         actorContext.setCommitIndex(-1);
113         actorContext.setPayloadVersion(payloadVersion);
114
115         long term = 1;
116         actorContext.getTermInformation().update(term, "");
117
118         leader = new Leader(actorContext);
119         actorContext.setCurrentBehavior(leader);
120
121         // Leader should send an immediate heartbeat with no entries as follower is inactive.
122         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
123         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
124         assertEquals("getTerm", term, appendEntries.getTerm());
125         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
126         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
127         assertEquals("Entries size", 0, appendEntries.getEntries().size());
128         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
129
130         // The follower would normally reply - simulate that explicitly here.
131         leader.handleMessage(followerActor, new AppendEntriesReply(
132                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
133         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
134
135         followerActor.underlyingActor().clear();
136
137         // Sleep for the heartbeat interval so AppendEntries is sent.
138         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
139                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
140
141         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
142
143         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
144         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
145         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
146         assertEquals("Entries size", 1, appendEntries.getEntries().size());
147         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
148         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
149         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
150     }
151
152
153     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
154         return sendReplicate(actorContext, 1, index);
155     }
156
157     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
158         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
159         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
160         actorContext.getReplicatedLog().append(newEntry);
161         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
162     }
163
164     @Test
165     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
166         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
167
168         MockRaftActorContext actorContext = createActorContextWithFollower();
169
170         long term = 1;
171         actorContext.getTermInformation().update(term, "");
172
173         leader = new Leader(actorContext);
174
175         // Leader will send an immediate heartbeat - ignore it.
176         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
177
178         // The follower would normally reply - simulate that explicitly here.
179         long lastIndex = actorContext.getReplicatedLog().lastIndex();
180         leader.handleMessage(followerActor, new AppendEntriesReply(
181                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
182         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
183
184         followerActor.underlyingActor().clear();
185
186         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
187
188         // State should not change
189         assertTrue(raftBehavior instanceof Leader);
190
191         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
193         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
194         assertEquals("Entries size", 1, appendEntries.getEntries().size());
195         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
196         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
197         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
198         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
199     }
200
201     @Test
202     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
203         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
204
205         MockRaftActorContext actorContext = createActorContextWithFollower();
206         actorContext.setCommitIndex(-1);
207         actorContext.setLastApplied(-1);
208
209         // The raft context is initialized with a couple log entries. However the commitIndex
210         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
211         // committed and applied. Now it regains leadership with a higher term (2).
212         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
213         long newTerm = prevTerm + 1;
214         actorContext.getTermInformation().update(newTerm, "");
215
216         leader = new Leader(actorContext);
217         actorContext.setCurrentBehavior(leader);
218
219         // Leader will send an immediate heartbeat - ignore it.
220         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
221
222         // The follower replies with the leader's current last index and term, simulating that it is
223         // up to date with the leader.
224         long lastIndex = actorContext.getReplicatedLog().lastIndex();
225         leader.handleMessage(followerActor, new AppendEntriesReply(
226                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
227
228         // The commit index should not get updated even though consensus was reached. This is b/c the
229         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
230         // from previous terms by counting replicas".
231         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
232
233         followerActor.underlyingActor().clear();
234
235         // Now replicate a new entry with the new term 2.
236         long newIndex = lastIndex + 1;
237         sendReplicate(actorContext, newTerm, newIndex);
238
239         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
240         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
241         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
242         assertEquals("Entries size", 1, appendEntries.getEntries().size());
243         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
244         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
245         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
246
247         // The follower replies with success. The leader should now update the commit index to the new index
248         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
249         // prior entries are committed indirectly".
250         leader.handleMessage(followerActor, new AppendEntriesReply(
251                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
252
253         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
254     }
255
256     @Test
257     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
258         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
259
260         MockRaftActorContext actorContext = createActorContextWithFollower();
261         actorContext.setRaftPolicy(createRaftPolicy(true, true));
262
263         long term = 1;
264         actorContext.getTermInformation().update(term, "");
265
266         leader = new Leader(actorContext);
267
268         // Leader will send an immediate heartbeat - ignore it.
269         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
270
271         // The follower would normally reply - simulate that explicitly here.
272         long lastIndex = actorContext.getReplicatedLog().lastIndex();
273         leader.handleMessage(followerActor, new AppendEntriesReply(
274                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
275         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
276
277         followerActor.underlyingActor().clear();
278
279         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
280
281         // State should not change
282         assertTrue(raftBehavior instanceof Leader);
283
284         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
286         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
287         assertEquals("Entries size", 1, appendEntries.getEntries().size());
288         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
289         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
290         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
291         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
292     }
293
294     @Test
295     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
296         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
297
298         MockRaftActorContext actorContext = createActorContextWithFollower();
299         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
300             @Override
301             public FiniteDuration getHeartBeatInterval() {
302                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
303             }
304         });
305
306         long term = 1;
307         actorContext.getTermInformation().update(term, "");
308
309         leader = new Leader(actorContext);
310
311         // Leader will send an immediate heartbeat - ignore it.
312         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
313
314         // The follower would normally reply - simulate that explicitly here.
315         long lastIndex = actorContext.getReplicatedLog().lastIndex();
316         leader.handleMessage(followerActor, new AppendEntriesReply(
317                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
318         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
319
320         followerActor.underlyingActor().clear();
321
322         for (int i = 0; i < 5; i++) {
323             sendReplicate(actorContext, lastIndex + i + 1);
324         }
325
326         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
327         // We expect only 1 message to be sent because of two reasons,
328         // - an append entries reply was not received
329         // - the heartbeat interval has not expired
330         // In this scenario if multiple messages are sent they would likely be duplicates
331         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
332     }
333
334     @Test
335     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
336         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
337
338         MockRaftActorContext actorContext = createActorContextWithFollower();
339         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
340             @Override
341             public FiniteDuration getHeartBeatInterval() {
342                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
343             }
344         });
345
346         long term = 1;
347         actorContext.getTermInformation().update(term, "");
348
349         leader = new Leader(actorContext);
350
351         // Leader will send an immediate heartbeat - ignore it.
352         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
353
354         // The follower would normally reply - simulate that explicitly here.
355         long lastIndex = actorContext.getReplicatedLog().lastIndex();
356         leader.handleMessage(followerActor, new AppendEntriesReply(
357                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
358         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
359
360         followerActor.underlyingActor().clear();
361
362         for (int i = 0; i < 3; i++) {
363             sendReplicate(actorContext, lastIndex + i + 1);
364             leader.handleMessage(followerActor, new AppendEntriesReply(
365                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
366
367         }
368
369         for (int i = 3; i < 5; i++) {
370             sendReplicate(actorContext, lastIndex + i + 1);
371         }
372
373         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
374         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
375         // get sent to the follower - but not the 5th
376         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
377
378         for (int i = 0; i < 4; i++) {
379             long expected = allMessages.get(i).getEntries().get(0).getIndex();
380             assertEquals(expected, i + 2);
381         }
382     }
383
384     @Test
385     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
386         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
387
388         MockRaftActorContext actorContext = createActorContextWithFollower();
389         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
390             @Override
391             public FiniteDuration getHeartBeatInterval() {
392                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
393             }
394         });
395
396         long term = 1;
397         actorContext.getTermInformation().update(term, "");
398
399         leader = new Leader(actorContext);
400
401         // Leader will send an immediate heartbeat - ignore it.
402         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
403
404         // The follower would normally reply - simulate that explicitly here.
405         long lastIndex = actorContext.getReplicatedLog().lastIndex();
406         leader.handleMessage(followerActor, new AppendEntriesReply(
407                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
408         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
409
410         followerActor.underlyingActor().clear();
411
412         sendReplicate(actorContext, lastIndex + 1);
413
414         // Wait slightly longer than heartbeat duration
415         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
416
417         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
418
419         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
420         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
421
422         assertEquals(1, allMessages.get(0).getEntries().size());
423         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
424         assertEquals(1, allMessages.get(1).getEntries().size());
425         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
426
427     }
428
429     @Test
430     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
431         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
432
433         MockRaftActorContext actorContext = createActorContextWithFollower();
434         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
435             @Override
436             public FiniteDuration getHeartBeatInterval() {
437                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
438             }
439         });
440
441         long term = 1;
442         actorContext.getTermInformation().update(term, "");
443
444         leader = new Leader(actorContext);
445
446         // Leader will send an immediate heartbeat - ignore it.
447         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
448
449         // The follower would normally reply - simulate that explicitly here.
450         long lastIndex = actorContext.getReplicatedLog().lastIndex();
451         leader.handleMessage(followerActor, new AppendEntriesReply(
452                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
453         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
454
455         followerActor.underlyingActor().clear();
456
457         for (int i = 0; i < 3; i++) {
458             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
459             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
460         }
461
462         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
463         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
464     }
465
466     @Test
467     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
468         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
469
470         MockRaftActorContext actorContext = createActorContextWithFollower();
471         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
472             @Override
473             public FiniteDuration getHeartBeatInterval() {
474                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
475             }
476         });
477
478         long term = 1;
479         actorContext.getTermInformation().update(term, "");
480
481         leader = new Leader(actorContext);
482
483         // Leader will send an immediate heartbeat - ignore it.
484         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
485
486         // The follower would normally reply - simulate that explicitly here.
487         long lastIndex = actorContext.getReplicatedLog().lastIndex();
488         leader.handleMessage(followerActor, new AppendEntriesReply(
489                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
490         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
491
492         followerActor.underlyingActor().clear();
493
494         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
495         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
496         sendReplicate(actorContext, lastIndex + 1);
497
498         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
499         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
500
501         assertEquals(0, allMessages.get(0).getEntries().size());
502         assertEquals(1, allMessages.get(1).getEntries().size());
503     }
504
505
506     @Test
507     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
508         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
509
510         MockRaftActorContext actorContext = createActorContext();
511
512         leader = new Leader(actorContext);
513
514         actorContext.setLastApplied(0);
515
516         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
517         long term = actorContext.getTermInformation().getCurrentTerm();
518         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
519                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
520
521         actorContext.getReplicatedLog().append(newEntry);
522
523         final Identifier id = new MockIdentifier("state-id");
524         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
525                 new Replicate(leaderActor, id, newEntry, true));
526
527         // State should not change
528         assertTrue(raftBehavior instanceof Leader);
529
530         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
531
532         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
533         // one since lastApplied state is 0.
534         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
535                 leaderActor, ApplyState.class);
536         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
537
538         for (int i = 0; i <= newLogIndex - 1; i++ ) {
539             ApplyState applyState = applyStateList.get(i);
540             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
541             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
542         }
543
544         ApplyState last = applyStateList.get((int) newLogIndex - 1);
545         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
546         assertEquals("getIdentifier", id, last.getIdentifier());
547     }
548
549     @Test
550     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
551         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
552
553         final MockRaftActorContext actorContext = createActorContextWithFollower();
554
555         Map<String, String> leadersSnapshot = new HashMap<>();
556         leadersSnapshot.put("1", "A");
557         leadersSnapshot.put("2", "B");
558         leadersSnapshot.put("3", "C");
559
560         //clears leaders log
561         actorContext.getReplicatedLog().removeFrom(0);
562
563         final int commitIndex = 3;
564         final int snapshotIndex = 2;
565         final int snapshotTerm = 1;
566
567         // set the snapshot variables in replicatedlog
568         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
569         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
570         actorContext.setCommitIndex(commitIndex);
571         //set follower timeout to 2 mins, helps during debugging
572         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
573
574         leader = new Leader(actorContext);
575
576         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
577         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
578
579         //update follower timestamp
580         leader.markFollowerActive(FOLLOWER_ID);
581
582         ByteString bs = toByteString(leadersSnapshot);
583         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
584                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
585                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
586         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
587                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
588         fts.setSnapshotBytes(bs);
589         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
590
591         //send first chunk and no InstallSnapshotReply received yet
592         fts.getNextChunk();
593         fts.incrementChunkIndex();
594
595         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
596                 TimeUnit.MILLISECONDS);
597
598         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
599
600         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
601
602         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
603
604         //InstallSnapshotReply received
605         fts.markSendStatus(true);
606
607         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
608
609         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
610
611         assertEquals(commitIndex, is.getLastIncludedIndex());
612     }
613
614     @Test
615     public void testSendAppendEntriesSnapshotScenario() throws Exception {
616         logStart("testSendAppendEntriesSnapshotScenario");
617
618         final MockRaftActorContext actorContext = createActorContextWithFollower();
619
620         Map<String, String> leadersSnapshot = new HashMap<>();
621         leadersSnapshot.put("1", "A");
622         leadersSnapshot.put("2", "B");
623         leadersSnapshot.put("3", "C");
624
625         //clears leaders log
626         actorContext.getReplicatedLog().removeFrom(0);
627
628         final int followersLastIndex = 2;
629         final int snapshotIndex = 3;
630         final int newEntryIndex = 4;
631         final int snapshotTerm = 1;
632         final int currentTerm = 2;
633
634         // set the snapshot variables in replicatedlog
635         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
636         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
637         actorContext.setCommitIndex(followersLastIndex);
638
639         leader = new Leader(actorContext);
640
641         // Leader will send an immediate heartbeat - ignore it.
642         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
643
644         // new entry
645         SimpleReplicatedLogEntry entry =
646                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
647                         new MockRaftActorContext.MockPayload("D"));
648
649         actorContext.getReplicatedLog().append(entry);
650
651         //update follower timestamp
652         leader.markFollowerActive(FOLLOWER_ID);
653
654         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
655         RaftActorBehavior raftBehavior = leader.handleMessage(
656                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
657
658         assertTrue(raftBehavior instanceof Leader);
659
660         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
661     }
662
663     @Test
664     public void testInitiateInstallSnapshot() throws Exception {
665         logStart("testInitiateInstallSnapshot");
666
667         MockRaftActorContext actorContext = createActorContextWithFollower();
668
669         //clears leaders log
670         actorContext.getReplicatedLog().removeFrom(0);
671
672         final int followersLastIndex = 2;
673         final int snapshotIndex = 3;
674         final int newEntryIndex = 4;
675         final int snapshotTerm = 1;
676         final int currentTerm = 2;
677
678         // set the snapshot variables in replicatedlog
679         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681         actorContext.setLastApplied(3);
682         actorContext.setCommitIndex(followersLastIndex);
683
684         leader = new Leader(actorContext);
685
686         // Leader will send an immediate heartbeat - ignore it.
687         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
688
689         // set the snapshot as absent and check if capture-snapshot is invoked.
690         leader.setSnapshot(null);
691
692         // new entry
693         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
694                 new MockRaftActorContext.MockPayload("D"));
695
696         actorContext.getReplicatedLog().append(entry);
697
698         //update follower timestamp
699         leader.markFollowerActive(FOLLOWER_ID);
700
701         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
702
703         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
704
705         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
706
707         assertEquals(3, cs.getLastAppliedIndex());
708         assertEquals(1, cs.getLastAppliedTerm());
709         assertEquals(4, cs.getLastIndex());
710         assertEquals(2, cs.getLastTerm());
711
712         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
713         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
714
715         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
716     }
717
718     @Test
719     public void testInitiateForceInstallSnapshot() throws Exception {
720         logStart("testInitiateForceInstallSnapshot");
721
722         MockRaftActorContext actorContext = createActorContextWithFollower();
723
724         final int followersLastIndex = 2;
725         final int snapshotIndex = -1;
726         final int newEntryIndex = 4;
727         final int snapshotTerm = -1;
728         final int currentTerm = 2;
729
730         // set the snapshot variables in replicatedlog
731         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733         actorContext.setLastApplied(3);
734         actorContext.setCommitIndex(followersLastIndex);
735
736         actorContext.getReplicatedLog().removeFrom(0);
737
738         leader = new Leader(actorContext);
739         actorContext.setCurrentBehavior(leader);
740
741         // Leader will send an immediate heartbeat - ignore it.
742         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
743
744         // set the snapshot as absent and check if capture-snapshot is invoked.
745         leader.setSnapshot(null);
746
747         for (int i = 0; i < 4; i++) {
748             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
749                     new MockRaftActorContext.MockPayload("X" + i)));
750         }
751
752         // new entry
753         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
754                 new MockRaftActorContext.MockPayload("D"));
755
756         actorContext.getReplicatedLog().append(entry);
757
758         //update follower timestamp
759         leader.markFollowerActive(FOLLOWER_ID);
760
761         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
762         // installed with a SendInstallSnapshot
763         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
764
765         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
766
767         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
768
769         assertEquals(3, cs.getLastAppliedIndex());
770         assertEquals(1, cs.getLastAppliedTerm());
771         assertEquals(4, cs.getLastIndex());
772         assertEquals(2, cs.getLastTerm());
773
774         // if an initiate is started again when first is in progress, it should not initiate Capture
775         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
776
777         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
778     }
779
780
781     @Test
782     public void testInstallSnapshot() throws Exception {
783         logStart("testInstallSnapshot");
784
785         final MockRaftActorContext actorContext = createActorContextWithFollower();
786
787         Map<String, String> leadersSnapshot = new HashMap<>();
788         leadersSnapshot.put("1", "A");
789         leadersSnapshot.put("2", "B");
790         leadersSnapshot.put("3", "C");
791
792         //clears leaders log
793         actorContext.getReplicatedLog().removeFrom(0);
794
795         final int lastAppliedIndex = 3;
796         final int snapshotIndex = 2;
797         final int snapshotTerm = 1;
798         final int currentTerm = 2;
799
800         // set the snapshot variables in replicatedlog
801         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804         actorContext.setCommitIndex(lastAppliedIndex);
805         actorContext.setLastApplied(lastAppliedIndex);
806
807         leader = new Leader(actorContext);
808
809         // Initial heartbeat.
810         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
811
812         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
813         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
814
815         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
816         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
817                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
818
819         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
820                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
821
822         assertTrue(raftBehavior instanceof Leader);
823
824         // check if installsnapshot gets called with the correct values.
825
826         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
827                 InstallSnapshot.class);
828
829         assertNotNull(installSnapshot.getData());
830         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
831         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
832
833         assertEquals(currentTerm, installSnapshot.getTerm());
834     }
835
836     @Test
837     public void testForceInstallSnapshot() throws Exception {
838         logStart("testForceInstallSnapshot");
839
840         final MockRaftActorContext actorContext = createActorContextWithFollower();
841
842         Map<String, String> leadersSnapshot = new HashMap<>();
843         leadersSnapshot.put("1", "A");
844         leadersSnapshot.put("2", "B");
845         leadersSnapshot.put("3", "C");
846
847         final int lastAppliedIndex = 3;
848         final int snapshotIndex = -1;
849         final int snapshotTerm = -1;
850         final int currentTerm = 2;
851
852         // set the snapshot variables in replicatedlog
853         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856         actorContext.setCommitIndex(lastAppliedIndex);
857         actorContext.setLastApplied(lastAppliedIndex);
858
859         leader = new Leader(actorContext);
860
861         // Initial heartbeat.
862         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863
864         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
866
867         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
868         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
869                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
870
871         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
872                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
873
874         assertTrue(raftBehavior instanceof Leader);
875
876         // check if installsnapshot gets called with the correct values.
877
878         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
879                 InstallSnapshot.class);
880
881         assertNotNull(installSnapshot.getData());
882         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
883         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
884
885         assertEquals(currentTerm, installSnapshot.getTerm());
886     }
887
888     @Test
889     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
890         logStart("testHandleInstallSnapshotReplyLastChunk");
891
892         MockRaftActorContext actorContext = createActorContextWithFollower();
893
894         final int commitIndex = 3;
895         final int snapshotIndex = 2;
896         final int snapshotTerm = 1;
897         final int currentTerm = 2;
898
899         actorContext.setCommitIndex(commitIndex);
900
901         leader = new Leader(actorContext);
902         actorContext.setCurrentBehavior(leader);
903
904         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
905         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
906
907         // Ignore initial heartbeat.
908         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
909
910         Map<String, String> leadersSnapshot = new HashMap<>();
911         leadersSnapshot.put("1", "A");
912         leadersSnapshot.put("2", "B");
913         leadersSnapshot.put("3", "C");
914
915         // set the snapshot variables in replicatedlog
916
917         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
918         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
919         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
920
921         ByteString bs = toByteString(leadersSnapshot);
922         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
923                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
924                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
925         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
926                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
927         fts.setSnapshotBytes(bs);
928         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
929         while (!fts.isLastChunk(fts.getChunkIndex())) {
930             fts.getNextChunk();
931             fts.incrementChunkIndex();
932         }
933
934         //clears leaders log
935         actorContext.getReplicatedLog().removeFrom(0);
936
937         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
938                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
939
940         assertTrue(raftBehavior instanceof Leader);
941
942         assertEquals(1, leader.followerLogSize());
943         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
944         assertNotNull(fli);
945         assertNull(fli.getInstallSnapshotState());
946         assertEquals(commitIndex, fli.getMatchIndex());
947         assertEquals(commitIndex + 1, fli.getNextIndex());
948         assertFalse(leader.hasSnapshot());
949     }
950
951     @Test
952     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
953         logStart("testSendSnapshotfromInstallSnapshotReply");
954
955         MockRaftActorContext actorContext = createActorContextWithFollower();
956
957         final int commitIndex = 3;
958         final int snapshotIndex = 2;
959         final int snapshotTerm = 1;
960         final int currentTerm = 2;
961
962         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
963             @Override
964             public int getSnapshotChunkSize() {
965                 return 50;
966             }
967         };
968         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
969         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
970
971         actorContext.setConfigParams(configParams);
972         actorContext.setCommitIndex(commitIndex);
973
974         leader = new Leader(actorContext);
975         actorContext.setCurrentBehavior(leader);
976
977         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
978         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
979
980         Map<String, String> leadersSnapshot = new HashMap<>();
981         leadersSnapshot.put("1", "A");
982         leadersSnapshot.put("2", "B");
983         leadersSnapshot.put("3", "C");
984
985         // set the snapshot variables in replicatedlog
986         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
987         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
988         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
989
990         ByteString bs = toByteString(leadersSnapshot);
991         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
992                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
993                 -1, null, null);
994
995         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
996
997         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
998                 InstallSnapshot.class);
999
1000         assertEquals(1, installSnapshot.getChunkIndex());
1001         assertEquals(3, installSnapshot.getTotalChunks());
1002
1003         followerActor.underlyingActor().clear();
1004         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1005                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1006
1007         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1008
1009         assertEquals(2, installSnapshot.getChunkIndex());
1010         assertEquals(3, installSnapshot.getTotalChunks());
1011
1012         followerActor.underlyingActor().clear();
1013         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1014                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1015
1016         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1017
1018         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1019         followerActor.underlyingActor().clear();
1020         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1021                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1022
1023         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1024
1025         assertNull(installSnapshot);
1026     }
1027
1028
1029     @Test
1030     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1031         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1032
1033         MockRaftActorContext actorContext = createActorContextWithFollower();
1034
1035         final int commitIndex = 3;
1036         final int snapshotIndex = 2;
1037         final int snapshotTerm = 1;
1038         final int currentTerm = 2;
1039
1040         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1041             @Override
1042             public int getSnapshotChunkSize() {
1043                 return 50;
1044             }
1045         });
1046
1047         actorContext.setCommitIndex(commitIndex);
1048
1049         leader = new Leader(actorContext);
1050
1051         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1052         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1053
1054         Map<String, String> leadersSnapshot = new HashMap<>();
1055         leadersSnapshot.put("1", "A");
1056         leadersSnapshot.put("2", "B");
1057         leadersSnapshot.put("3", "C");
1058
1059         // set the snapshot variables in replicatedlog
1060         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1061         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1062         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1063
1064         ByteString bs = toByteString(leadersSnapshot);
1065         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1066                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1067                 -1, null, null);
1068
1069         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1070         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1071
1072         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1073                 InstallSnapshot.class);
1074
1075         assertEquals(1, installSnapshot.getChunkIndex());
1076         assertEquals(3, installSnapshot.getTotalChunks());
1077
1078         followerActor.underlyingActor().clear();
1079
1080         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1081                 FOLLOWER_ID, -1, false));
1082
1083         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1084                 TimeUnit.MILLISECONDS);
1085
1086         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1087
1088         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1089
1090         assertEquals(1, installSnapshot.getChunkIndex());
1091         assertEquals(3, installSnapshot.getTotalChunks());
1092     }
1093
1094     @Test
1095     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1096         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1097
1098         MockRaftActorContext actorContext = createActorContextWithFollower();
1099
1100         final int commitIndex = 3;
1101         final int snapshotIndex = 2;
1102         final int snapshotTerm = 1;
1103         final int currentTerm = 2;
1104
1105         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1106             @Override
1107             public int getSnapshotChunkSize() {
1108                 return 50;
1109             }
1110         });
1111
1112         actorContext.setCommitIndex(commitIndex);
1113
1114         leader = new Leader(actorContext);
1115
1116         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1117         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1118
1119         Map<String, String> leadersSnapshot = new HashMap<>();
1120         leadersSnapshot.put("1", "A");
1121         leadersSnapshot.put("2", "B");
1122         leadersSnapshot.put("3", "C");
1123
1124         // set the snapshot variables in replicatedlog
1125         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1126         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1127         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1128
1129         ByteString bs = toByteString(leadersSnapshot);
1130         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1131                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1132                 -1, null, null);
1133
1134         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1135
1136         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1137                 InstallSnapshot.class);
1138
1139         assertEquals(1, installSnapshot.getChunkIndex());
1140         assertEquals(3, installSnapshot.getTotalChunks());
1141         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1142                 installSnapshot.getLastChunkHashCode().get().intValue());
1143
1144         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1145
1146         followerActor.underlyingActor().clear();
1147
1148         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1149                 FOLLOWER_ID, 1, true));
1150
1151         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1152
1153         assertEquals(2, installSnapshot.getChunkIndex());
1154         assertEquals(3, installSnapshot.getTotalChunks());
1155         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1156     }
1157
1158     @Test
1159     public void testLeaderInstallSnapshotState() {
1160         logStart("testLeaderInstallSnapshotState");
1161
1162         Map<String, String> leadersSnapshot = new HashMap<>();
1163         leadersSnapshot.put("1", "A");
1164         leadersSnapshot.put("2", "B");
1165         leadersSnapshot.put("3", "C");
1166
1167         ByteString bs = toByteString(leadersSnapshot);
1168         byte[] barray = bs.toByteArray();
1169
1170         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1171         fts.setSnapshotBytes(bs);
1172
1173         assertEquals(bs.size(), barray.length);
1174
1175         int chunkIndex = 0;
1176         for (int i = 0; i < barray.length; i = i + 50) {
1177             int length = i + 50;
1178             chunkIndex++;
1179
1180             if (i + 50 > barray.length) {
1181                 length = barray.length;
1182             }
1183
1184             byte[] chunk = fts.getNextChunk();
1185             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1186             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1187
1188             fts.markSendStatus(true);
1189             if (!fts.isLastChunk(chunkIndex)) {
1190                 fts.incrementChunkIndex();
1191             }
1192         }
1193
1194         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1195     }
1196
1197     @Override
1198     protected Leader createBehavior(final RaftActorContext actorContext) {
1199         return new Leader(actorContext);
1200     }
1201
1202     @Override
1203     protected MockRaftActorContext createActorContext() {
1204         return createActorContext(leaderActor);
1205     }
1206
1207     @Override
1208     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1209         return createActorContext(LEADER_ID, actorRef);
1210     }
1211
1212     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1213         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1214         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1215         configParams.setElectionTimeoutFactor(100000);
1216         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1217         context.setConfigParams(configParams);
1218         context.setPayloadVersion(payloadVersion);
1219         return context;
1220     }
1221
1222     private MockRaftActorContext createActorContextWithFollower() {
1223         MockRaftActorContext actorContext = createActorContext();
1224         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1225                 followerActor.path().toString()).build());
1226         return actorContext;
1227     }
1228
1229     private MockRaftActorContext createFollowerActorContextWithLeader() {
1230         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1231         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1232         followerConfig.setElectionTimeoutFactor(10000);
1233         followerActorContext.setConfigParams(followerConfig);
1234         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1235         return followerActorContext;
1236     }
1237
1238     @Test
1239     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1240         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1241
1242         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1243
1244         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1245
1246         Follower follower = new Follower(followerActorContext);
1247         followerActor.underlyingActor().setBehavior(follower);
1248         followerActorContext.setCurrentBehavior(follower);
1249
1250         Map<String, String> peerAddresses = new HashMap<>();
1251         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1252
1253         leaderActorContext.setPeerAddresses(peerAddresses);
1254
1255         leaderActorContext.getReplicatedLog().removeFrom(0);
1256
1257         //create 3 entries
1258         leaderActorContext.setReplicatedLog(
1259                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1260
1261         leaderActorContext.setCommitIndex(1);
1262
1263         followerActorContext.getReplicatedLog().removeFrom(0);
1264
1265         // follower too has the exact same log entries and has the same commit index
1266         followerActorContext.setReplicatedLog(
1267                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1268
1269         followerActorContext.setCommitIndex(1);
1270
1271         leader = new Leader(leaderActorContext);
1272         leaderActorContext.setCurrentBehavior(leader);
1273
1274         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1275
1276         assertEquals(-1, appendEntries.getLeaderCommit());
1277         assertEquals(0, appendEntries.getEntries().size());
1278         assertEquals(0, appendEntries.getPrevLogIndex());
1279
1280         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1281                 leaderActor, AppendEntriesReply.class);
1282
1283         assertEquals(2, appendEntriesReply.getLogLastIndex());
1284         assertEquals(1, appendEntriesReply.getLogLastTerm());
1285
1286         // follower returns its next index
1287         assertEquals(2, appendEntriesReply.getLogLastIndex());
1288         assertEquals(1, appendEntriesReply.getLogLastTerm());
1289
1290         follower.close();
1291     }
1292
1293     @Test
1294     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1295         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1296
1297         final MockRaftActorContext leaderActorContext = createActorContext();
1298
1299         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1300         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1301
1302         Follower follower = new Follower(followerActorContext);
1303         followerActor.underlyingActor().setBehavior(follower);
1304         followerActorContext.setCurrentBehavior(follower);
1305
1306         Map<String, String> leaderPeerAddresses = new HashMap<>();
1307         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1308
1309         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1310
1311         leaderActorContext.getReplicatedLog().removeFrom(0);
1312
1313         leaderActorContext.setReplicatedLog(
1314                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315
1316         leaderActorContext.setCommitIndex(1);
1317
1318         followerActorContext.getReplicatedLog().removeFrom(0);
1319
1320         followerActorContext.setReplicatedLog(
1321                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1322
1323         // follower has the same log entries but its commit index > leaders commit index
1324         followerActorContext.setCommitIndex(2);
1325
1326         leader = new Leader(leaderActorContext);
1327
1328         // Initial heartbeat
1329         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1330
1331         assertEquals(-1, appendEntries.getLeaderCommit());
1332         assertEquals(0, appendEntries.getEntries().size());
1333         assertEquals(0, appendEntries.getPrevLogIndex());
1334
1335         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1336                 leaderActor, AppendEntriesReply.class);
1337
1338         assertEquals(2, appendEntriesReply.getLogLastIndex());
1339         assertEquals(1, appendEntriesReply.getLogLastTerm());
1340
1341         leaderActor.underlyingActor().setBehavior(follower);
1342         leader.handleMessage(followerActor, appendEntriesReply);
1343
1344         leaderActor.underlyingActor().clear();
1345         followerActor.underlyingActor().clear();
1346
1347         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1348                 TimeUnit.MILLISECONDS);
1349
1350         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1351
1352         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1353
1354         assertEquals(2, appendEntries.getLeaderCommit());
1355         assertEquals(0, appendEntries.getEntries().size());
1356         assertEquals(2, appendEntries.getPrevLogIndex());
1357
1358         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1359
1360         assertEquals(2, appendEntriesReply.getLogLastIndex());
1361         assertEquals(1, appendEntriesReply.getLogLastTerm());
1362
1363         assertEquals(2, followerActorContext.getCommitIndex());
1364
1365         follower.close();
1366     }
1367
1368     @Test
1369     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1370         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1371
1372         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1373         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1374                 new FiniteDuration(1000, TimeUnit.SECONDS));
1375
1376         leaderActorContext.setReplicatedLog(
1377                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1378         long leaderCommitIndex = 2;
1379         leaderActorContext.setCommitIndex(leaderCommitIndex);
1380         leaderActorContext.setLastApplied(leaderCommitIndex);
1381
1382         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1383         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1384
1385         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1386
1387         followerActorContext.setReplicatedLog(
1388                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1389         followerActorContext.setCommitIndex(0);
1390         followerActorContext.setLastApplied(0);
1391
1392         Follower follower = new Follower(followerActorContext);
1393         followerActor.underlyingActor().setBehavior(follower);
1394
1395         leader = new Leader(leaderActorContext);
1396
1397         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1398         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1399                 AppendEntriesReply.class);
1400
1401         MessageCollectorActor.clearMessages(followerActor);
1402         MessageCollectorActor.clearMessages(leaderActor);
1403
1404         // Verify initial AppendEntries sent.
1405         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1406         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1407         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1408
1409         leaderActor.underlyingActor().setBehavior(leader);
1410
1411         leader.handleMessage(followerActor, appendEntriesReply);
1412
1413         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1414         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1415
1416         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1417         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1418         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1419
1420         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1421         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1422                 appendEntries.getEntries().get(0).getData());
1423         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1424         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1425                 appendEntries.getEntries().get(1).getData());
1426
1427         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1428         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1429
1430         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1431
1432         ApplyState applyState = applyStateList.get(0);
1433         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1434         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1435         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1436                 applyState.getReplicatedLogEntry().getData());
1437
1438         applyState = applyStateList.get(1);
1439         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1440         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1441         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1442                 applyState.getReplicatedLogEntry().getData());
1443
1444         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1445         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1446     }
1447
1448     @Test
1449     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1450         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1451
1452         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1453         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1454                 new FiniteDuration(1000, TimeUnit.SECONDS));
1455
1456         leaderActorContext.setReplicatedLog(
1457                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1458         long leaderCommitIndex = 1;
1459         leaderActorContext.setCommitIndex(leaderCommitIndex);
1460         leaderActorContext.setLastApplied(leaderCommitIndex);
1461
1462         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1463         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1464
1465         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1466
1467         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1468         followerActorContext.setCommitIndex(-1);
1469         followerActorContext.setLastApplied(-1);
1470
1471         Follower follower = new Follower(followerActorContext);
1472         followerActor.underlyingActor().setBehavior(follower);
1473         followerActorContext.setCurrentBehavior(follower);
1474
1475         leader = new Leader(leaderActorContext);
1476
1477         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1478         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1479                 AppendEntriesReply.class);
1480
1481         MessageCollectorActor.clearMessages(followerActor);
1482         MessageCollectorActor.clearMessages(leaderActor);
1483
1484         // Verify initial AppendEntries sent with the leader's current commit index.
1485         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1486         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1487         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1488
1489         leaderActor.underlyingActor().setBehavior(leader);
1490         leaderActorContext.setCurrentBehavior(leader);
1491
1492         leader.handleMessage(followerActor, appendEntriesReply);
1493
1494         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1495         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1496
1497         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1498         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1499         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1500
1501         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1502         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1503                 appendEntries.getEntries().get(0).getData());
1504         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1505         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1506                 appendEntries.getEntries().get(1).getData());
1507
1508         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1509         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1510
1511         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1512
1513         ApplyState applyState = applyStateList.get(0);
1514         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1515         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1516         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1517                 applyState.getReplicatedLogEntry().getData());
1518
1519         applyState = applyStateList.get(1);
1520         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1521         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1522         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1523                 applyState.getReplicatedLogEntry().getData());
1524
1525         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1526         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1527     }
1528
1529     @Test
1530     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1531         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1532
1533         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1534         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1535                 new FiniteDuration(1000, TimeUnit.SECONDS));
1536
1537         leaderActorContext.setReplicatedLog(
1538                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1539         long leaderCommitIndex = 1;
1540         leaderActorContext.setCommitIndex(leaderCommitIndex);
1541         leaderActorContext.setLastApplied(leaderCommitIndex);
1542
1543         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1544         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1545
1546         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1547
1548         followerActorContext.setReplicatedLog(
1549                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1550         followerActorContext.setCommitIndex(-1);
1551         followerActorContext.setLastApplied(-1);
1552
1553         Follower follower = new Follower(followerActorContext);
1554         followerActor.underlyingActor().setBehavior(follower);
1555         followerActorContext.setCurrentBehavior(follower);
1556
1557         leader = new Leader(leaderActorContext);
1558
1559         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1560         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1561                 AppendEntriesReply.class);
1562
1563         MessageCollectorActor.clearMessages(followerActor);
1564         MessageCollectorActor.clearMessages(leaderActor);
1565
1566         // Verify initial AppendEntries sent with the leader's current commit index.
1567         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1568         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1569         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1570
1571         leaderActor.underlyingActor().setBehavior(leader);
1572         leaderActorContext.setCurrentBehavior(leader);
1573
1574         leader.handleMessage(followerActor, appendEntriesReply);
1575
1576         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1577         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1578
1579         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1580         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1581         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1582
1583         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1584         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1585         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1586                 appendEntries.getEntries().get(0).getData());
1587         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1588         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1589         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1590                 appendEntries.getEntries().get(1).getData());
1591
1592         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1593         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1594
1595         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1596
1597         ApplyState applyState = applyStateList.get(0);
1598         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1599         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1601                 applyState.getReplicatedLogEntry().getData());
1602
1603         applyState = applyStateList.get(1);
1604         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1605         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1606         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1607                 applyState.getReplicatedLogEntry().getData());
1608
1609         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1610         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1611         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1612     }
1613
1614     @Test
1615     public void testHandleAppendEntriesReplyWithNewerTerm() {
1616         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1617
1618         MockRaftActorContext leaderActorContext = createActorContext();
1619         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1620                 new FiniteDuration(10000, TimeUnit.SECONDS));
1621
1622         leaderActorContext.setReplicatedLog(
1623                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1624
1625         leader = new Leader(leaderActorContext);
1626         leaderActor.underlyingActor().setBehavior(leader);
1627         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1628
1629         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1630                 AppendEntriesReply.class);
1631
1632         assertEquals(false, appendEntriesReply.isSuccess());
1633         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1634
1635         MessageCollectorActor.clearMessages(leaderActor);
1636     }
1637
1638     @Test
1639     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1640         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1641
1642         MockRaftActorContext leaderActorContext = createActorContext();
1643         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1644                 new FiniteDuration(10000, TimeUnit.SECONDS));
1645
1646         leaderActorContext.setReplicatedLog(
1647                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1648         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1649
1650         leader = new Leader(leaderActorContext);
1651         leaderActor.underlyingActor().setBehavior(leader);
1652         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1653
1654         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1655                 AppendEntriesReply.class);
1656
1657         assertEquals(false, appendEntriesReply.isSuccess());
1658         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1659
1660         MessageCollectorActor.clearMessages(leaderActor);
1661     }
1662
1663     @Test
1664     public void testHandleAppendEntriesReplySuccess() throws Exception {
1665         logStart("testHandleAppendEntriesReplySuccess");
1666
1667         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1668
1669         leaderActorContext.setReplicatedLog(
1670                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1671
1672         leaderActorContext.setCommitIndex(1);
1673         leaderActorContext.setLastApplied(1);
1674         leaderActorContext.getTermInformation().update(1, "leader");
1675
1676         leader = new Leader(leaderActorContext);
1677
1678         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1679
1680         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1681         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1682
1683         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1684
1685         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1686
1687         assertEquals(RaftState.Leader, raftActorBehavior.state());
1688
1689         assertEquals(2, leaderActorContext.getCommitIndex());
1690
1691         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1692                 leaderActor, ApplyJournalEntries.class);
1693
1694         assertEquals(2, leaderActorContext.getLastApplied());
1695
1696         assertEquals(2, applyJournalEntries.getToIndex());
1697
1698         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1699                 ApplyState.class);
1700
1701         assertEquals(1,applyStateList.size());
1702
1703         ApplyState applyState = applyStateList.get(0);
1704
1705         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1706
1707         assertEquals(2, followerInfo.getMatchIndex());
1708         assertEquals(3, followerInfo.getNextIndex());
1709         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1710         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1711     }
1712
1713     @Test
1714     public void testHandleAppendEntriesReplyUnknownFollower() {
1715         logStart("testHandleAppendEntriesReplyUnknownFollower");
1716
1717         MockRaftActorContext leaderActorContext = createActorContext();
1718
1719         leader = new Leader(leaderActorContext);
1720
1721         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1722
1723         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1724
1725         assertEquals(RaftState.Leader, raftActorBehavior.state());
1726     }
1727
1728     @Test
1729     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1730         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1731
1732         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1733         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1734                 new FiniteDuration(1000, TimeUnit.SECONDS));
1735         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1736
1737         leaderActorContext.setReplicatedLog(
1738                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1739         long leaderCommitIndex = 3;
1740         leaderActorContext.setCommitIndex(leaderCommitIndex);
1741         leaderActorContext.setLastApplied(leaderCommitIndex);
1742
1743         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1744         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1745         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1746         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1747
1748         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1749
1750         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1751         followerActorContext.setCommitIndex(-1);
1752         followerActorContext.setLastApplied(-1);
1753
1754         Follower follower = new Follower(followerActorContext);
1755         followerActor.underlyingActor().setBehavior(follower);
1756         followerActorContext.setCurrentBehavior(follower);
1757
1758         leader = new Leader(leaderActorContext);
1759
1760         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1761         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1762                 AppendEntriesReply.class);
1763
1764         MessageCollectorActor.clearMessages(followerActor);
1765         MessageCollectorActor.clearMessages(leaderActor);
1766
1767         // Verify initial AppendEntries sent with the leader's current commit index.
1768         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1769         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1770         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1771
1772         leaderActor.underlyingActor().setBehavior(leader);
1773         leaderActorContext.setCurrentBehavior(leader);
1774
1775         leader.handleMessage(followerActor, appendEntriesReply);
1776
1777         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1778                 AppendEntries.class, 2);
1779         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1780
1781         appendEntries = appendEntriesList.get(0);
1782         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1783         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1784         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1785
1786         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1787         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1788                 appendEntries.getEntries().get(0).getData());
1789         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1790         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1791                 appendEntries.getEntries().get(1).getData());
1792
1793         appendEntries = appendEntriesList.get(1);
1794         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1795         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1796         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1797
1798         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1799         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1800                 appendEntries.getEntries().get(0).getData());
1801         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1802         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1803                 appendEntries.getEntries().get(1).getData());
1804
1805         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1806         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1807
1808         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1809
1810         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1811         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1812     }
1813
1814     @Test
1815     public void testHandleRequestVoteReply() {
1816         logStart("testHandleRequestVoteReply");
1817
1818         MockRaftActorContext leaderActorContext = createActorContext();
1819
1820         leader = new Leader(leaderActorContext);
1821
1822         // Should be a no-op.
1823         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1824                 new RequestVoteReply(1, true));
1825
1826         assertEquals(RaftState.Leader, raftActorBehavior.state());
1827
1828         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1829
1830         assertEquals(RaftState.Leader, raftActorBehavior.state());
1831     }
1832
1833     @Test
1834     public void testIsolatedLeaderCheckNoFollowers() {
1835         logStart("testIsolatedLeaderCheckNoFollowers");
1836
1837         MockRaftActorContext leaderActorContext = createActorContext();
1838
1839         leader = new Leader(leaderActorContext);
1840         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1841         assertTrue(newBehavior instanceof Leader);
1842     }
1843
1844     @Test
1845     public void testIsolatedLeaderCheckNoVotingFollowers() {
1846         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1847
1848         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1849         Follower follower = new Follower(followerActorContext);
1850         followerActor.underlyingActor().setBehavior(follower);
1851
1852         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1853         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1854                 new FiniteDuration(1000, TimeUnit.SECONDS));
1855         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1856
1857         leader = new Leader(leaderActorContext);
1858         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1859         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1860         assertTrue("Expected Leader", newBehavior instanceof Leader);
1861     }
1862
1863     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1864         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1865         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1866
1867         MockRaftActorContext leaderActorContext = createActorContext();
1868
1869         Map<String, String> peerAddresses = new HashMap<>();
1870         peerAddresses.put("follower-1", followerActor1.path().toString());
1871         peerAddresses.put("follower-2", followerActor2.path().toString());
1872
1873         leaderActorContext.setPeerAddresses(peerAddresses);
1874         leaderActorContext.setRaftPolicy(raftPolicy);
1875
1876         leader = new Leader(leaderActorContext);
1877
1878         leader.markFollowerActive("follower-1");
1879         leader.markFollowerActive("follower-2");
1880         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1881         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1882
1883         // kill 1 follower and verify if that got killed
1884         final JavaTestKit probe = new JavaTestKit(getSystem());
1885         probe.watch(followerActor1);
1886         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1887         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1888         assertEquals(termMsg1.getActor(), followerActor1);
1889
1890         leader.markFollowerInActive("follower-1");
1891         leader.markFollowerActive("follower-2");
1892         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1893         assertTrue("Behavior not instance of Leader when majority of followers are active",
1894                 newBehavior instanceof Leader);
1895
1896         // kill 2nd follower and leader should change to Isolated leader
1897         followerActor2.tell(PoisonPill.getInstance(), null);
1898         probe.watch(followerActor2);
1899         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1900         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1901         assertEquals(termMsg2.getActor(), followerActor2);
1902
1903         leader.markFollowerInActive("follower-2");
1904         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1905     }
1906
1907     @Test
1908     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1909         logStart("testIsolatedLeaderCheckTwoFollowers");
1910
1911         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1912
1913         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1914             newBehavior instanceof IsolatedLeader);
1915     }
1916
1917     @Test
1918     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1919         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1920
1921         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1922
1923         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1924                 newBehavior instanceof Leader);
1925     }
1926
1927     @Test
1928     public void testLaggingFollowerStarvation() throws Exception {
1929         logStart("testLaggingFollowerStarvation");
1930
1931         String leaderActorId = actorFactory.generateActorId("leader");
1932         String follower1ActorId = actorFactory.generateActorId("follower");
1933         String follower2ActorId = actorFactory.generateActorId("follower");
1934
1935         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1936         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1937
1938         MockRaftActorContext leaderActorContext =
1939                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1940
1941         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1942         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1943         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1944
1945         leaderActorContext.setConfigParams(configParams);
1946
1947         leaderActorContext.setReplicatedLog(
1948                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1949
1950         Map<String, String> peerAddresses = new HashMap<>();
1951         peerAddresses.put(follower1ActorId,
1952                 follower1Actor.path().toString());
1953         peerAddresses.put(follower2ActorId,
1954                 follower2Actor.path().toString());
1955
1956         leaderActorContext.setPeerAddresses(peerAddresses);
1957         leaderActorContext.getTermInformation().update(1, leaderActorId);
1958
1959         leader = createBehavior(leaderActorContext);
1960
1961         leaderActor.underlyingActor().setBehavior(leader);
1962
1963         for (int i = 1; i < 6; i++) {
1964             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1965             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1966                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1967             assertTrue(newBehavior == leader);
1968             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1969         }
1970
1971         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1972         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1973
1974         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1975                 heartbeats.size() > 1);
1976
1977         // Check if follower-2 got AppendEntries during this time and was not starved
1978         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1979
1980         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1981                 appendEntries.size() > 1);
1982     }
1983
1984     @Test
1985     public void testReplicationConsensusWithNonVotingFollower() {
1986         logStart("testReplicationConsensusWithNonVotingFollower");
1987
1988         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1989         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1990                 new FiniteDuration(1000, TimeUnit.SECONDS));
1991
1992         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1993         leaderActorContext.setCommitIndex(-1);
1994         leaderActorContext.setLastApplied(-1);
1995
1996         String nonVotingFollowerId = "nonvoting-follower";
1997         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1998                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1999
2000         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2001                 VotingState.NON_VOTING);
2002
2003         leader = new Leader(leaderActorContext);
2004         leaderActorContext.setCurrentBehavior(leader);
2005
2006         // Ignore initial heartbeats
2007         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2008         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2009
2010         MessageCollectorActor.clearMessages(followerActor);
2011         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2012         MessageCollectorActor.clearMessages(leaderActor);
2013
2014         // Send a Replicate message and wait for AppendEntries.
2015         sendReplicate(leaderActorContext, 0);
2016
2017         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2018         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2019
2020         // Send reply only from the voting follower and verify consensus via ApplyState.
2021         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2022
2023         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2024
2025         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2026
2027         MessageCollectorActor.clearMessages(followerActor);
2028         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2029         MessageCollectorActor.clearMessages(leaderActor);
2030
2031         // Send another Replicate message
2032         sendReplicate(leaderActorContext, 1);
2033
2034         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2035         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2036                 AppendEntries.class);
2037         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2038         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2039
2040         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2041         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2042
2043         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2044
2045         // Send reply from the voting follower and verify consensus.
2046         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2047
2048         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2049     }
2050
2051     @Test
2052     public void testTransferLeadershipWithFollowerInSync() {
2053         logStart("testTransferLeadershipWithFollowerInSync");
2054
2055         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2056         leaderActorContext.setLastApplied(-1);
2057         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2058                 new FiniteDuration(1000, TimeUnit.SECONDS));
2059         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2060
2061         leader = new Leader(leaderActorContext);
2062         leaderActorContext.setCurrentBehavior(leader);
2063
2064         // Initial heartbeat
2065         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2067         MessageCollectorActor.clearMessages(followerActor);
2068
2069         sendReplicate(leaderActorContext, 0);
2070         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2071
2072         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2073         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2074         MessageCollectorActor.clearMessages(followerActor);
2075
2076         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2077         leader.transferLeadership(mockTransferCohort);
2078
2079         verify(mockTransferCohort, never()).transferComplete();
2080         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2081         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2082
2083         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2084         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2085
2086         // Leader should force an election timeout
2087         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2088
2089         verify(mockTransferCohort).transferComplete();
2090     }
2091
2092     @Test
2093     public void testTransferLeadershipWithEmptyLog() {
2094         logStart("testTransferLeadershipWithEmptyLog");
2095
2096         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2097         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2098                 new FiniteDuration(1000, TimeUnit.SECONDS));
2099         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2100
2101         leader = new Leader(leaderActorContext);
2102         leaderActorContext.setCurrentBehavior(leader);
2103
2104         // Initial heartbeat
2105         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107         MessageCollectorActor.clearMessages(followerActor);
2108
2109         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2110         leader.transferLeadership(mockTransferCohort);
2111
2112         verify(mockTransferCohort, never()).transferComplete();
2113         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2114         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2115
2116         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2117         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2118
2119         // Leader should force an election timeout
2120         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2121
2122         verify(mockTransferCohort).transferComplete();
2123     }
2124
2125     @Test
2126     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2127         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2128
2129         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2130         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2131                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2132
2133         leader = new Leader(leaderActorContext);
2134         leaderActorContext.setCurrentBehavior(leader);
2135
2136         // Initial heartbeat
2137         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2138         MessageCollectorActor.clearMessages(followerActor);
2139
2140         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2141         leader.transferLeadership(mockTransferCohort);
2142
2143         verify(mockTransferCohort, never()).transferComplete();
2144
2145         // Sync up the follower.
2146         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2148         MessageCollectorActor.clearMessages(followerActor);
2149
2150         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2151                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2152         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2153         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2155
2156         // Leader should force an election timeout
2157         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2158
2159         verify(mockTransferCohort).transferComplete();
2160     }
2161
2162     @Test
2163     public void testTransferLeadershipWithFollowerSyncTimeout() {
2164         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2165
2166         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2167         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2168                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2169         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2170         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2171
2172         leader = new Leader(leaderActorContext);
2173         leaderActorContext.setCurrentBehavior(leader);
2174
2175         // Initial heartbeat
2176         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2177         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2178         MessageCollectorActor.clearMessages(followerActor);
2179
2180         sendReplicate(leaderActorContext, 0);
2181         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2182
2183         MessageCollectorActor.clearMessages(followerActor);
2184
2185         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2186         leader.transferLeadership(mockTransferCohort);
2187
2188         verify(mockTransferCohort, never()).transferComplete();
2189
2190         // Send heartbeats to time out the transfer.
2191         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2192             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2193                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2194             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2195         }
2196
2197         verify(mockTransferCohort).abortTransfer();
2198         verify(mockTransferCohort, never()).transferComplete();
2199         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2200     }
2201
2202     @Override
2203     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2204             ActorRef actorRef, RaftRPC rpc) throws Exception {
2205         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2206         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2207     }
2208
2209     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2210
2211         private final long electionTimeOutIntervalMillis;
2212         private final int snapshotChunkSize;
2213
2214         MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2215             super();
2216             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2217             this.snapshotChunkSize = snapshotChunkSize;
2218         }
2219
2220         @Override
2221         public FiniteDuration getElectionTimeOutInterval() {
2222             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2223         }
2224
2225         @Override
2226         public int getSnapshotChunkSize() {
2227             return snapshotChunkSize;
2228         }
2229     }
2230 }