Remove MockReplicatedLogEntry
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.google.protobuf.ByteString;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.TimeUnit;
36 import org.junit.After;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
39 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
40 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorContext;
42 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
43 import org.opendaylight.controller.cluster.raft.RaftState;
44 import org.opendaylight.controller.cluster.raft.RaftVersions;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
63 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
65 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.yangtools.concepts.Identifier;
68 import scala.concurrent.duration.FiniteDuration;
69
70 public class LeaderTest extends AbstractLeaderTest<Leader> {
71
72     static final String FOLLOWER_ID = "follower";
73     public static final String LEADER_ID = "leader";
74
75     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
76             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
77
78     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
79             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
80
81     private Leader leader;
82     private final short payloadVersion = 5;
83
84     @Override
85     @After
86     public void tearDown() throws Exception {
87         if (leader != null) {
88             leader.close();
89         }
90
91         super.tearDown();
92     }
93
94     @Test
95     public void testHandleMessageForUnknownMessage() throws Exception {
96         logStart("testHandleMessageForUnknownMessage");
97
98         leader = new Leader(createActorContext());
99
100         // handle message should null when it receives an unknown message
101         assertNull(leader.handleMessage(followerActor, "foo"));
102     }
103
104     @Test
105     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107
108         MockRaftActorContext actorContext = createActorContextWithFollower();
109         actorContext.setCommitIndex(-1);
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116         actorContext.setCurrentBehavior(leader);
117
118         // Leader should send an immediate heartbeat with no entries as follower is inactive.
119         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
120         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getTerm", term, appendEntries.getTerm());
122         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 0, appendEntries.getEntries().size());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126
127         // The follower would normally reply - simulate that explicitly here.
128         leader.handleMessage(followerActor, new AppendEntriesReply(
129                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131
132         followerActor.underlyingActor().clear();
133
134         // Sleep for the heartbeat interval so AppendEntries is sent.
135         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
136                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137
138         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139
140         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143         assertEquals("Entries size", 1, appendEntries.getEntries().size());
144         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147     }
148
149
150     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
151         return sendReplicate(actorContext, 1, index);
152     }
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
155         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
159     }
160
161     @Test
162     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
163         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
164
165         MockRaftActorContext actorContext = createActorContextWithFollower();
166
167         long term = 1;
168         actorContext.getTermInformation().update(term, "");
169
170         leader = new Leader(actorContext);
171
172         // Leader will send an immediate heartbeat - ignore it.
173         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
174
175         // The follower would normally reply - simulate that explicitly here.
176         long lastIndex = actorContext.getReplicatedLog().lastIndex();
177         leader.handleMessage(followerActor, new AppendEntriesReply(
178                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
179         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
180
181         followerActor.underlyingActor().clear();
182
183         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
184
185         // State should not change
186         assertTrue(raftBehavior instanceof Leader);
187
188         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
190         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
191         assertEquals("Entries size", 1, appendEntries.getEntries().size());
192         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
193         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
194         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
195         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
196     }
197
198     @Test
199     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
200         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
201
202         MockRaftActorContext actorContext = createActorContextWithFollower();
203         actorContext.setCommitIndex(-1);
204         actorContext.setLastApplied(-1);
205
206         // The raft context is initialized with a couple log entries. However the commitIndex
207         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
208         // committed and applied. Now it regains leadership with a higher term (2).
209         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
210         long newTerm = prevTerm + 1;
211         actorContext.getTermInformation().update(newTerm, "");
212
213         leader = new Leader(actorContext);
214         actorContext.setCurrentBehavior(leader);
215
216         // Leader will send an immediate heartbeat - ignore it.
217         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
218
219         // The follower replies with the leader's current last index and term, simulating that it is
220         // up to date with the leader.
221         long lastIndex = actorContext.getReplicatedLog().lastIndex();
222         leader.handleMessage(followerActor, new AppendEntriesReply(
223                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
224
225         // The commit index should not get updated even though consensus was reached. This is b/c the
226         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
227         // from previous terms by counting replicas".
228         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
229
230         followerActor.underlyingActor().clear();
231
232         // Now replicate a new entry with the new term 2.
233         long newIndex = lastIndex + 1;
234         sendReplicate(actorContext, newTerm, newIndex);
235
236         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
238         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
239         assertEquals("Entries size", 1, appendEntries.getEntries().size());
240         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
241         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
242         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
243
244         // The follower replies with success. The leader should now update the commit index to the new index
245         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
246         // prior entries are committed indirectly".
247         leader.handleMessage(followerActor, new AppendEntriesReply(
248                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
249
250         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
251     }
252
253     @Test
254     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
255         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
256
257         MockRaftActorContext actorContext = createActorContextWithFollower();
258         actorContext.setRaftPolicy(createRaftPolicy(true, true));
259
260         long term = 1;
261         actorContext.getTermInformation().update(term, "");
262
263         leader = new Leader(actorContext);
264
265         // Leader will send an immediate heartbeat - ignore it.
266         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
267
268         // The follower would normally reply - simulate that explicitly here.
269         long lastIndex = actorContext.getReplicatedLog().lastIndex();
270         leader.handleMessage(followerActor, new AppendEntriesReply(
271                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
272         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
273
274         followerActor.underlyingActor().clear();
275
276         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
277
278         // State should not change
279         assertTrue(raftBehavior instanceof Leader);
280
281         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
283         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
284         assertEquals("Entries size", 1, appendEntries.getEntries().size());
285         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
286         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
287         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
288         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
289     }
290
291     @Test
292     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
293         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
294
295         MockRaftActorContext actorContext = createActorContextWithFollower();
296         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
297             @Override
298             public FiniteDuration getHeartBeatInterval() {
299                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
300             }
301         });
302
303         long term = 1;
304         actorContext.getTermInformation().update(term, "");
305
306         leader = new Leader(actorContext);
307
308         // Leader will send an immediate heartbeat - ignore it.
309         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
310
311         // The follower would normally reply - simulate that explicitly here.
312         long lastIndex = actorContext.getReplicatedLog().lastIndex();
313         leader.handleMessage(followerActor, new AppendEntriesReply(
314                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
315         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
316
317         followerActor.underlyingActor().clear();
318
319         for (int i = 0; i < 5; i++) {
320             sendReplicate(actorContext, lastIndex + i + 1);
321         }
322
323         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
324         // We expect only 1 message to be sent because of two reasons,
325         // - an append entries reply was not received
326         // - the heartbeat interval has not expired
327         // In this scenario if multiple messages are sent they would likely be duplicates
328         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
329     }
330
331     @Test
332     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
333         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
334
335         MockRaftActorContext actorContext = createActorContextWithFollower();
336         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
337             @Override
338             public FiniteDuration getHeartBeatInterval() {
339                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
340             }
341         });
342
343         long term = 1;
344         actorContext.getTermInformation().update(term, "");
345
346         leader = new Leader(actorContext);
347
348         // Leader will send an immediate heartbeat - ignore it.
349         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
350
351         // The follower would normally reply - simulate that explicitly here.
352         long lastIndex = actorContext.getReplicatedLog().lastIndex();
353         leader.handleMessage(followerActor, new AppendEntriesReply(
354                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
355         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
356
357         followerActor.underlyingActor().clear();
358
359         for (int i = 0; i < 3; i++) {
360             sendReplicate(actorContext, lastIndex + i + 1);
361             leader.handleMessage(followerActor, new AppendEntriesReply(
362                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
363
364         }
365
366         for (int i = 3; i < 5; i++) {
367             sendReplicate(actorContext, lastIndex + i + 1);
368         }
369
370         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
371         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
372         // get sent to the follower - but not the 5th
373         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
374
375         for (int i = 0; i < 4; i++) {
376             long expected = allMessages.get(i).getEntries().get(0).getIndex();
377             assertEquals(expected, i + 2);
378         }
379     }
380
381     @Test
382     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
383         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
384
385         MockRaftActorContext actorContext = createActorContextWithFollower();
386         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
387             @Override
388             public FiniteDuration getHeartBeatInterval() {
389                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
390             }
391         });
392
393         long term = 1;
394         actorContext.getTermInformation().update(term, "");
395
396         leader = new Leader(actorContext);
397
398         // Leader will send an immediate heartbeat - ignore it.
399         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
400
401         // The follower would normally reply - simulate that explicitly here.
402         long lastIndex = actorContext.getReplicatedLog().lastIndex();
403         leader.handleMessage(followerActor, new AppendEntriesReply(
404                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
405         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
406
407         followerActor.underlyingActor().clear();
408
409         sendReplicate(actorContext, lastIndex + 1);
410
411         // Wait slightly longer than heartbeat duration
412         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
413
414         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
415
416         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
417         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
418
419         assertEquals(1, allMessages.get(0).getEntries().size());
420         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
421         assertEquals(1, allMessages.get(1).getEntries().size());
422         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
423
424     }
425
426     @Test
427     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
428         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
432             @Override
433             public FiniteDuration getHeartBeatInterval() {
434                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
435             }
436         });
437
438         long term = 1;
439         actorContext.getTermInformation().update(term, "");
440
441         leader = new Leader(actorContext);
442
443         // Leader will send an immediate heartbeat - ignore it.
444         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
445
446         // The follower would normally reply - simulate that explicitly here.
447         long lastIndex = actorContext.getReplicatedLog().lastIndex();
448         leader.handleMessage(followerActor, new AppendEntriesReply(
449                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
450         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
451
452         followerActor.underlyingActor().clear();
453
454         for (int i = 0; i < 3; i++) {
455             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
456             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
457         }
458
459         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
460         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
461     }
462
463     @Test
464     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
465         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
466
467         MockRaftActorContext actorContext = createActorContextWithFollower();
468         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
469             @Override
470             public FiniteDuration getHeartBeatInterval() {
471                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
472             }
473         });
474
475         long term = 1;
476         actorContext.getTermInformation().update(term, "");
477
478         leader = new Leader(actorContext);
479
480         // Leader will send an immediate heartbeat - ignore it.
481         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482
483         // The follower would normally reply - simulate that explicitly here.
484         long lastIndex = actorContext.getReplicatedLog().lastIndex();
485         leader.handleMessage(followerActor, new AppendEntriesReply(
486                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
487         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
488
489         followerActor.underlyingActor().clear();
490
491         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493         sendReplicate(actorContext, lastIndex + 1);
494
495         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
497
498         assertEquals(0, allMessages.get(0).getEntries().size());
499         assertEquals(1, allMessages.get(1).getEntries().size());
500     }
501
502
503     @Test
504     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
505         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
506
507         MockRaftActorContext actorContext = createActorContext();
508
509         leader = new Leader(actorContext);
510
511         actorContext.setLastApplied(0);
512
513         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
514         long term = actorContext.getTermInformation().getCurrentTerm();
515         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
516                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
517
518         actorContext.getReplicatedLog().append(newEntry);
519
520         final Identifier id = new MockIdentifier("state-id");
521         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
522
523         // State should not change
524         assertTrue(raftBehavior instanceof Leader);
525
526         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
527
528         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
529         // one since lastApplied state is 0.
530         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
531                 leaderActor, ApplyState.class);
532         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
533
534         for (int i = 0; i <= newLogIndex - 1; i++ ) {
535             ApplyState applyState = applyStateList.get(i);
536             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
537             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
538         }
539
540         ApplyState last = applyStateList.get((int) newLogIndex - 1);
541         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
542         assertEquals("getIdentifier", id, last.getIdentifier());
543     }
544
545     @Test
546     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
547         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
548
549         final MockRaftActorContext actorContext = createActorContextWithFollower();
550
551         Map<String, String> leadersSnapshot = new HashMap<>();
552         leadersSnapshot.put("1", "A");
553         leadersSnapshot.put("2", "B");
554         leadersSnapshot.put("3", "C");
555
556         //clears leaders log
557         actorContext.getReplicatedLog().removeFrom(0);
558
559         final int commitIndex = 3;
560         final int snapshotIndex = 2;
561         final int snapshotTerm = 1;
562
563         // set the snapshot variables in replicatedlog
564         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
565         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
566         actorContext.setCommitIndex(commitIndex);
567         //set follower timeout to 2 mins, helps during debugging
568         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
569
570         leader = new Leader(actorContext);
571
572         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
573         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
574
575         //update follower timestamp
576         leader.markFollowerActive(FOLLOWER_ID);
577
578         ByteString bs = toByteString(leadersSnapshot);
579         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
580                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
581         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
582                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
583         fts.setSnapshotBytes(bs);
584         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
585
586         //send first chunk and no InstallSnapshotReply received yet
587         fts.getNextChunk();
588         fts.incrementChunkIndex();
589
590         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
591                 TimeUnit.MILLISECONDS);
592
593         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
594
595         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
596
597         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
598
599         //InstallSnapshotReply received
600         fts.markSendStatus(true);
601
602         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
603
604         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
605
606         assertEquals(commitIndex, is.getLastIncludedIndex());
607     }
608
609     @Test
610     public void testSendAppendEntriesSnapshotScenario() throws Exception {
611         logStart("testSendAppendEntriesSnapshotScenario");
612
613         final MockRaftActorContext actorContext = createActorContextWithFollower();
614
615         Map<String, String> leadersSnapshot = new HashMap<>();
616         leadersSnapshot.put("1", "A");
617         leadersSnapshot.put("2", "B");
618         leadersSnapshot.put("3", "C");
619
620         //clears leaders log
621         actorContext.getReplicatedLog().removeFrom(0);
622
623         final int followersLastIndex = 2;
624         final int snapshotIndex = 3;
625         final int newEntryIndex = 4;
626         final int snapshotTerm = 1;
627         final int currentTerm = 2;
628
629         // set the snapshot variables in replicatedlog
630         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
631         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
632         actorContext.setCommitIndex(followersLastIndex);
633
634         leader = new Leader(actorContext);
635
636         // Leader will send an immediate heartbeat - ignore it.
637         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
638
639         // new entry
640         SimpleReplicatedLogEntry entry =
641                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
642                         new MockRaftActorContext.MockPayload("D"));
643
644         actorContext.getReplicatedLog().append(entry);
645
646         //update follower timestamp
647         leader.markFollowerActive(FOLLOWER_ID);
648
649         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
650         RaftActorBehavior raftBehavior = leader.handleMessage(
651                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
652
653         assertTrue(raftBehavior instanceof Leader);
654
655         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
656     }
657
658     @Test
659     public void testInitiateInstallSnapshot() throws Exception {
660         logStart("testInitiateInstallSnapshot");
661
662         MockRaftActorContext actorContext = createActorContextWithFollower();
663
664         //clears leaders log
665         actorContext.getReplicatedLog().removeFrom(0);
666
667         final int followersLastIndex = 2;
668         final int snapshotIndex = 3;
669         final int newEntryIndex = 4;
670         final int snapshotTerm = 1;
671         final int currentTerm = 2;
672
673         // set the snapshot variables in replicatedlog
674         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
675         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
676         actorContext.setLastApplied(3);
677         actorContext.setCommitIndex(followersLastIndex);
678
679         leader = new Leader(actorContext);
680
681         // Leader will send an immediate heartbeat - ignore it.
682         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
683
684         // set the snapshot as absent and check if capture-snapshot is invoked.
685         leader.setSnapshot(null);
686
687         // new entry
688         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
689                 new MockRaftActorContext.MockPayload("D"));
690
691         actorContext.getReplicatedLog().append(entry);
692
693         //update follower timestamp
694         leader.markFollowerActive(FOLLOWER_ID);
695
696         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
697
698         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
699
700         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
701
702         assertTrue(cs.isInstallSnapshotInitiated());
703         assertEquals(3, cs.getLastAppliedIndex());
704         assertEquals(1, cs.getLastAppliedTerm());
705         assertEquals(4, cs.getLastIndex());
706         assertEquals(2, cs.getLastTerm());
707
708         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
709         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
710
711         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
712     }
713
714     @Test
715     public void testInitiateForceInstallSnapshot() throws Exception {
716         logStart("testInitiateForceInstallSnapshot");
717
718         MockRaftActorContext actorContext = createActorContextWithFollower();
719
720         final int followersLastIndex = 2;
721         final int snapshotIndex = -1;
722         final int newEntryIndex = 4;
723         final int snapshotTerm = -1;
724         final int currentTerm = 2;
725
726         // set the snapshot variables in replicatedlog
727         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
728         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
729         actorContext.setLastApplied(3);
730         actorContext.setCommitIndex(followersLastIndex);
731
732         actorContext.getReplicatedLog().removeFrom(0);
733
734         leader = new Leader(actorContext);
735         actorContext.setCurrentBehavior(leader);
736
737         // Leader will send an immediate heartbeat - ignore it.
738         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
739
740         // set the snapshot as absent and check if capture-snapshot is invoked.
741         leader.setSnapshot(null);
742
743         for (int i = 0; i < 4; i++) {
744             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
745                     new MockRaftActorContext.MockPayload("X" + i)));
746         }
747
748         // new entry
749         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
750                 new MockRaftActorContext.MockPayload("D"));
751
752         actorContext.getReplicatedLog().append(entry);
753
754         //update follower timestamp
755         leader.markFollowerActive(FOLLOWER_ID);
756
757         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
758         // installed with a SendInstallSnapshot
759         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
760
761         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
762
763         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
764
765         assertTrue(cs.isInstallSnapshotInitiated());
766         assertEquals(3, cs.getLastAppliedIndex());
767         assertEquals(1, cs.getLastAppliedTerm());
768         assertEquals(4, cs.getLastIndex());
769         assertEquals(2, cs.getLastTerm());
770
771         // if an initiate is started again when first is in progress, it should not initiate Capture
772         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
773
774         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
775     }
776
777
778     @Test
779     public void testInstallSnapshot() throws Exception {
780         logStart("testInstallSnapshot");
781
782         final MockRaftActorContext actorContext = createActorContextWithFollower();
783
784         Map<String, String> leadersSnapshot = new HashMap<>();
785         leadersSnapshot.put("1", "A");
786         leadersSnapshot.put("2", "B");
787         leadersSnapshot.put("3", "C");
788
789         //clears leaders log
790         actorContext.getReplicatedLog().removeFrom(0);
791
792         final int lastAppliedIndex = 3;
793         final int snapshotIndex = 2;
794         final int snapshotTerm = 1;
795         final int currentTerm = 2;
796
797         // set the snapshot variables in replicatedlog
798         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
799         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
800         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
801         actorContext.setCommitIndex(lastAppliedIndex);
802         actorContext.setLastApplied(lastAppliedIndex);
803
804         leader = new Leader(actorContext);
805
806         // Initial heartbeat.
807         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
808
809         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
810         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
811
812         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
813                 Collections.<ReplicatedLogEntry>emptyList(),
814                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
815
816         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
817
818         assertTrue(raftBehavior instanceof Leader);
819
820         // check if installsnapshot gets called with the correct values.
821
822         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
823                 InstallSnapshot.class);
824
825         assertNotNull(installSnapshot.getData());
826         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
827         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
828
829         assertEquals(currentTerm, installSnapshot.getTerm());
830     }
831
832     @Test
833     public void testForceInstallSnapshot() throws Exception {
834         logStart("testForceInstallSnapshot");
835
836         final MockRaftActorContext actorContext = createActorContextWithFollower();
837
838         Map<String, String> leadersSnapshot = new HashMap<>();
839         leadersSnapshot.put("1", "A");
840         leadersSnapshot.put("2", "B");
841         leadersSnapshot.put("3", "C");
842
843         final int lastAppliedIndex = 3;
844         final int snapshotIndex = -1;
845         final int snapshotTerm = -1;
846         final int currentTerm = 2;
847
848         // set the snapshot variables in replicatedlog
849         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
850         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
851         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
852         actorContext.setCommitIndex(lastAppliedIndex);
853         actorContext.setLastApplied(lastAppliedIndex);
854
855         leader = new Leader(actorContext);
856
857         // Initial heartbeat.
858         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
859
860         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
861         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
862
863         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
864                 Collections.<ReplicatedLogEntry>emptyList(),
865                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
866
867         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
868
869         assertTrue(raftBehavior instanceof Leader);
870
871         // check if installsnapshot gets called with the correct values.
872
873         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
874                 InstallSnapshot.class);
875
876         assertNotNull(installSnapshot.getData());
877         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
878         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
879
880         assertEquals(currentTerm, installSnapshot.getTerm());
881     }
882
883     @Test
884     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
885         logStart("testHandleInstallSnapshotReplyLastChunk");
886
887         MockRaftActorContext actorContext = createActorContextWithFollower();
888
889         final int commitIndex = 3;
890         final int snapshotIndex = 2;
891         final int snapshotTerm = 1;
892         final int currentTerm = 2;
893
894         actorContext.setCommitIndex(commitIndex);
895
896         leader = new Leader(actorContext);
897         actorContext.setCurrentBehavior(leader);
898
899         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
900         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
901
902         // Ignore initial heartbeat.
903         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
904
905         Map<String, String> leadersSnapshot = new HashMap<>();
906         leadersSnapshot.put("1", "A");
907         leadersSnapshot.put("2", "B");
908         leadersSnapshot.put("3", "C");
909
910         // set the snapshot variables in replicatedlog
911
912         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
913         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
914         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
915
916         ByteString bs = toByteString(leadersSnapshot);
917         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
918                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
919         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
920                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
921         fts.setSnapshotBytes(bs);
922         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
923         while (!fts.isLastChunk(fts.getChunkIndex())) {
924             fts.getNextChunk();
925             fts.incrementChunkIndex();
926         }
927
928         //clears leaders log
929         actorContext.getReplicatedLog().removeFrom(0);
930
931         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
932                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
933
934         assertTrue(raftBehavior instanceof Leader);
935
936         assertEquals(1, leader.followerLogSize());
937         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
938         assertNotNull(fli);
939         assertNull(fli.getInstallSnapshotState());
940         assertEquals(commitIndex, fli.getMatchIndex());
941         assertEquals(commitIndex + 1, fli.getNextIndex());
942         assertFalse(leader.hasSnapshot());
943     }
944
945     @Test
946     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
947         logStart("testSendSnapshotfromInstallSnapshotReply");
948
949         MockRaftActorContext actorContext = createActorContextWithFollower();
950
951         final int commitIndex = 3;
952         final int snapshotIndex = 2;
953         final int snapshotTerm = 1;
954         final int currentTerm = 2;
955
956         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
957             @Override
958             public int getSnapshotChunkSize() {
959                 return 50;
960             }
961         };
962         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
963         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
964
965         actorContext.setConfigParams(configParams);
966         actorContext.setCommitIndex(commitIndex);
967
968         leader = new Leader(actorContext);
969         actorContext.setCurrentBehavior(leader);
970
971         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
972         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
973
974         Map<String, String> leadersSnapshot = new HashMap<>();
975         leadersSnapshot.put("1", "A");
976         leadersSnapshot.put("2", "B");
977         leadersSnapshot.put("3", "C");
978
979         // set the snapshot variables in replicatedlog
980         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
981         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
982         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
983
984         ByteString bs = toByteString(leadersSnapshot);
985         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
986                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
987         leader.setSnapshot(snapshot);
988
989         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
990
991         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
992                 InstallSnapshot.class);
993
994         assertEquals(1, installSnapshot.getChunkIndex());
995         assertEquals(3, installSnapshot.getTotalChunks());
996
997         followerActor.underlyingActor().clear();
998         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
999                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1000
1001         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1002
1003         assertEquals(2, installSnapshot.getChunkIndex());
1004         assertEquals(3, installSnapshot.getTotalChunks());
1005
1006         followerActor.underlyingActor().clear();
1007         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1008                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1009
1010         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1011
1012         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1013         followerActor.underlyingActor().clear();
1014         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1015                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1016
1017         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1018
1019         assertNull(installSnapshot);
1020     }
1021
1022
1023     @Test
1024     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1025         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1026
1027         MockRaftActorContext actorContext = createActorContextWithFollower();
1028
1029         final int commitIndex = 3;
1030         final int snapshotIndex = 2;
1031         final int snapshotTerm = 1;
1032         final int currentTerm = 2;
1033
1034         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1035             @Override
1036             public int getSnapshotChunkSize() {
1037                 return 50;
1038             }
1039         });
1040
1041         actorContext.setCommitIndex(commitIndex);
1042
1043         leader = new Leader(actorContext);
1044
1045         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1046         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1047
1048         Map<String, String> leadersSnapshot = new HashMap<>();
1049         leadersSnapshot.put("1", "A");
1050         leadersSnapshot.put("2", "B");
1051         leadersSnapshot.put("3", "C");
1052
1053         // set the snapshot variables in replicatedlog
1054         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1055         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1056         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1057
1058         ByteString bs = toByteString(leadersSnapshot);
1059         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1060                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1061         leader.setSnapshot(snapshot);
1062
1063         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1064         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1065
1066         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1067                 InstallSnapshot.class);
1068
1069         assertEquals(1, installSnapshot.getChunkIndex());
1070         assertEquals(3, installSnapshot.getTotalChunks());
1071
1072         followerActor.underlyingActor().clear();
1073
1074         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1075                 FOLLOWER_ID, -1, false));
1076
1077         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1078                 TimeUnit.MILLISECONDS);
1079
1080         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1081
1082         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1083
1084         assertEquals(1, installSnapshot.getChunkIndex());
1085         assertEquals(3, installSnapshot.getTotalChunks());
1086     }
1087
1088     @Test
1089     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1090         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1091
1092         MockRaftActorContext actorContext = createActorContextWithFollower();
1093
1094         final int commitIndex = 3;
1095         final int snapshotIndex = 2;
1096         final int snapshotTerm = 1;
1097         final int currentTerm = 2;
1098
1099         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1100             @Override
1101             public int getSnapshotChunkSize() {
1102                 return 50;
1103             }
1104         });
1105
1106         actorContext.setCommitIndex(commitIndex);
1107
1108         leader = new Leader(actorContext);
1109
1110         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1111         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1112
1113         Map<String, String> leadersSnapshot = new HashMap<>();
1114         leadersSnapshot.put("1", "A");
1115         leadersSnapshot.put("2", "B");
1116         leadersSnapshot.put("3", "C");
1117
1118         // set the snapshot variables in replicatedlog
1119         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1120         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1121         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1122
1123         ByteString bs = toByteString(leadersSnapshot);
1124         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1125                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1126         leader.setSnapshot(snapshot);
1127
1128         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1129
1130         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1131                 InstallSnapshot.class);
1132
1133         assertEquals(1, installSnapshot.getChunkIndex());
1134         assertEquals(3, installSnapshot.getTotalChunks());
1135         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1136                 installSnapshot.getLastChunkHashCode().get().intValue());
1137
1138         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1139
1140         followerActor.underlyingActor().clear();
1141
1142         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1143                 FOLLOWER_ID, 1, true));
1144
1145         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1146
1147         assertEquals(2, installSnapshot.getChunkIndex());
1148         assertEquals(3, installSnapshot.getTotalChunks());
1149         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1150     }
1151
1152     @Test
1153     public void testLeaderInstallSnapshotState() {
1154         logStart("testLeaderInstallSnapshotState");
1155
1156         Map<String, String> leadersSnapshot = new HashMap<>();
1157         leadersSnapshot.put("1", "A");
1158         leadersSnapshot.put("2", "B");
1159         leadersSnapshot.put("3", "C");
1160
1161         ByteString bs = toByteString(leadersSnapshot);
1162         byte[] barray = bs.toByteArray();
1163
1164         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1165         fts.setSnapshotBytes(bs);
1166
1167         assertEquals(bs.size(), barray.length);
1168
1169         int chunkIndex = 0;
1170         for (int i = 0; i < barray.length; i = i + 50) {
1171             int length = i + 50;
1172             chunkIndex++;
1173
1174             if (i + 50 > barray.length) {
1175                 length = barray.length;
1176             }
1177
1178             byte[] chunk = fts.getNextChunk();
1179             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1180             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1181
1182             fts.markSendStatus(true);
1183             if (!fts.isLastChunk(chunkIndex)) {
1184                 fts.incrementChunkIndex();
1185             }
1186         }
1187
1188         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1189     }
1190
1191     @Override
1192     protected Leader createBehavior(final RaftActorContext actorContext) {
1193         return new Leader(actorContext);
1194     }
1195
1196     @Override
1197     protected MockRaftActorContext createActorContext() {
1198         return createActorContext(leaderActor);
1199     }
1200
1201     @Override
1202     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1203         return createActorContext(LEADER_ID, actorRef);
1204     }
1205
1206     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1207         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1208         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1209         configParams.setElectionTimeoutFactor(100000);
1210         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1211         context.setConfigParams(configParams);
1212         context.setPayloadVersion(payloadVersion);
1213         return context;
1214     }
1215
1216     private MockRaftActorContext createActorContextWithFollower() {
1217         MockRaftActorContext actorContext = createActorContext();
1218         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1219                 followerActor.path().toString()).build());
1220         return actorContext;
1221     }
1222
1223     private MockRaftActorContext createFollowerActorContextWithLeader() {
1224         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1225         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1226         followerConfig.setElectionTimeoutFactor(10000);
1227         followerActorContext.setConfigParams(followerConfig);
1228         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1229         return followerActorContext;
1230     }
1231
1232     @Test
1233     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1234         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1235
1236         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1237
1238         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1239
1240         Follower follower = new Follower(followerActorContext);
1241         followerActor.underlyingActor().setBehavior(follower);
1242         followerActorContext.setCurrentBehavior(follower);
1243
1244         Map<String, String> peerAddresses = new HashMap<>();
1245         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1246
1247         leaderActorContext.setPeerAddresses(peerAddresses);
1248
1249         leaderActorContext.getReplicatedLog().removeFrom(0);
1250
1251         //create 3 entries
1252         leaderActorContext.setReplicatedLog(
1253                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1254
1255         leaderActorContext.setCommitIndex(1);
1256
1257         followerActorContext.getReplicatedLog().removeFrom(0);
1258
1259         // follower too has the exact same log entries and has the same commit index
1260         followerActorContext.setReplicatedLog(
1261                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1262
1263         followerActorContext.setCommitIndex(1);
1264
1265         leader = new Leader(leaderActorContext);
1266         leaderActorContext.setCurrentBehavior(leader);
1267
1268         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1269
1270         assertEquals(-1, appendEntries.getLeaderCommit());
1271         assertEquals(0, appendEntries.getEntries().size());
1272         assertEquals(0, appendEntries.getPrevLogIndex());
1273
1274         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1275                 leaderActor, AppendEntriesReply.class);
1276
1277         assertEquals(2, appendEntriesReply.getLogLastIndex());
1278         assertEquals(1, appendEntriesReply.getLogLastTerm());
1279
1280         // follower returns its next index
1281         assertEquals(2, appendEntriesReply.getLogLastIndex());
1282         assertEquals(1, appendEntriesReply.getLogLastTerm());
1283
1284         follower.close();
1285     }
1286
1287     @Test
1288     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1289         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1290
1291         final MockRaftActorContext leaderActorContext = createActorContext();
1292
1293         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1294         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1295
1296         Follower follower = new Follower(followerActorContext);
1297         followerActor.underlyingActor().setBehavior(follower);
1298         followerActorContext.setCurrentBehavior(follower);
1299
1300         Map<String, String> leaderPeerAddresses = new HashMap<>();
1301         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1302
1303         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1304
1305         leaderActorContext.getReplicatedLog().removeFrom(0);
1306
1307         leaderActorContext.setReplicatedLog(
1308                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309
1310         leaderActorContext.setCommitIndex(1);
1311
1312         followerActorContext.getReplicatedLog().removeFrom(0);
1313
1314         followerActorContext.setReplicatedLog(
1315                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1316
1317         // follower has the same log entries but its commit index > leaders commit index
1318         followerActorContext.setCommitIndex(2);
1319
1320         leader = new Leader(leaderActorContext);
1321
1322         // Initial heartbeat
1323         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1324
1325         assertEquals(-1, appendEntries.getLeaderCommit());
1326         assertEquals(0, appendEntries.getEntries().size());
1327         assertEquals(0, appendEntries.getPrevLogIndex());
1328
1329         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1330                 leaderActor, AppendEntriesReply.class);
1331
1332         assertEquals(2, appendEntriesReply.getLogLastIndex());
1333         assertEquals(1, appendEntriesReply.getLogLastTerm());
1334
1335         leaderActor.underlyingActor().setBehavior(follower);
1336         leader.handleMessage(followerActor, appendEntriesReply);
1337
1338         leaderActor.underlyingActor().clear();
1339         followerActor.underlyingActor().clear();
1340
1341         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1342                 TimeUnit.MILLISECONDS);
1343
1344         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1345
1346         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1347
1348         assertEquals(2, appendEntries.getLeaderCommit());
1349         assertEquals(0, appendEntries.getEntries().size());
1350         assertEquals(2, appendEntries.getPrevLogIndex());
1351
1352         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1353
1354         assertEquals(2, appendEntriesReply.getLogLastIndex());
1355         assertEquals(1, appendEntriesReply.getLogLastTerm());
1356
1357         assertEquals(2, followerActorContext.getCommitIndex());
1358
1359         follower.close();
1360     }
1361
1362     @Test
1363     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1364         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1365
1366         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1367         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1368                 new FiniteDuration(1000, TimeUnit.SECONDS));
1369
1370         leaderActorContext.setReplicatedLog(
1371                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1372         long leaderCommitIndex = 2;
1373         leaderActorContext.setCommitIndex(leaderCommitIndex);
1374         leaderActorContext.setLastApplied(leaderCommitIndex);
1375
1376         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1377         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1378
1379         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1380
1381         followerActorContext.setReplicatedLog(
1382                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1383         followerActorContext.setCommitIndex(0);
1384         followerActorContext.setLastApplied(0);
1385
1386         Follower follower = new Follower(followerActorContext);
1387         followerActor.underlyingActor().setBehavior(follower);
1388
1389         leader = new Leader(leaderActorContext);
1390
1391         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1393                 AppendEntriesReply.class);
1394
1395         MessageCollectorActor.clearMessages(followerActor);
1396         MessageCollectorActor.clearMessages(leaderActor);
1397
1398         // Verify initial AppendEntries sent.
1399         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1400         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1401         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1402
1403         leaderActor.underlyingActor().setBehavior(leader);
1404
1405         leader.handleMessage(followerActor, appendEntriesReply);
1406
1407         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1408         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1409
1410         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1411         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1412         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1413
1414         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1415         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1416                 appendEntries.getEntries().get(0).getData());
1417         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1418         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1419                 appendEntries.getEntries().get(1).getData());
1420
1421         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1422         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1423
1424         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1425
1426         ApplyState applyState = applyStateList.get(0);
1427         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1428         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1429         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1430                 applyState.getReplicatedLogEntry().getData());
1431
1432         applyState = applyStateList.get(1);
1433         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1434         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1435         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1436                 applyState.getReplicatedLogEntry().getData());
1437
1438         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1439         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1440     }
1441
1442     @Test
1443     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1444         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1445
1446         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1447         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1448                 new FiniteDuration(1000, TimeUnit.SECONDS));
1449
1450         leaderActorContext.setReplicatedLog(
1451                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1452         long leaderCommitIndex = 1;
1453         leaderActorContext.setCommitIndex(leaderCommitIndex);
1454         leaderActorContext.setLastApplied(leaderCommitIndex);
1455
1456         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1457         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1458
1459         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1460
1461         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1462         followerActorContext.setCommitIndex(-1);
1463         followerActorContext.setLastApplied(-1);
1464
1465         Follower follower = new Follower(followerActorContext);
1466         followerActor.underlyingActor().setBehavior(follower);
1467         followerActorContext.setCurrentBehavior(follower);
1468
1469         leader = new Leader(leaderActorContext);
1470
1471         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1472         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1473                 AppendEntriesReply.class);
1474
1475         MessageCollectorActor.clearMessages(followerActor);
1476         MessageCollectorActor.clearMessages(leaderActor);
1477
1478         // Verify initial AppendEntries sent with the leader's current commit index.
1479         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1480         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1481         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1482
1483         leaderActor.underlyingActor().setBehavior(leader);
1484         leaderActorContext.setCurrentBehavior(leader);
1485
1486         leader.handleMessage(followerActor, appendEntriesReply);
1487
1488         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1489         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1490
1491         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1492         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1493         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1494
1495         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1496         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1497                 appendEntries.getEntries().get(0).getData());
1498         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1499         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1500                 appendEntries.getEntries().get(1).getData());
1501
1502         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1503         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1504
1505         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1506
1507         ApplyState applyState = applyStateList.get(0);
1508         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1509         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1510         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1511                 applyState.getReplicatedLogEntry().getData());
1512
1513         applyState = applyStateList.get(1);
1514         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1515         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1516         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1517                 applyState.getReplicatedLogEntry().getData());
1518
1519         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1520         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1521     }
1522
1523     @Test
1524     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1525         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1526
1527         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1528         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1529                 new FiniteDuration(1000, TimeUnit.SECONDS));
1530
1531         leaderActorContext.setReplicatedLog(
1532                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1533         long leaderCommitIndex = 1;
1534         leaderActorContext.setCommitIndex(leaderCommitIndex);
1535         leaderActorContext.setLastApplied(leaderCommitIndex);
1536
1537         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1538         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1539
1540         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1541
1542         followerActorContext.setReplicatedLog(
1543                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1544         followerActorContext.setCommitIndex(-1);
1545         followerActorContext.setLastApplied(-1);
1546
1547         Follower follower = new Follower(followerActorContext);
1548         followerActor.underlyingActor().setBehavior(follower);
1549         followerActorContext.setCurrentBehavior(follower);
1550
1551         leader = new Leader(leaderActorContext);
1552
1553         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1554         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1555                 AppendEntriesReply.class);
1556
1557         MessageCollectorActor.clearMessages(followerActor);
1558         MessageCollectorActor.clearMessages(leaderActor);
1559
1560         // Verify initial AppendEntries sent with the leader's current commit index.
1561         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1562         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1563         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1564
1565         leaderActor.underlyingActor().setBehavior(leader);
1566         leaderActorContext.setCurrentBehavior(leader);
1567
1568         leader.handleMessage(followerActor, appendEntriesReply);
1569
1570         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1571         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1572
1573         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1574         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1575         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1576
1577         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1578         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1579         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1580                 appendEntries.getEntries().get(0).getData());
1581         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1582         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1583         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1584                 appendEntries.getEntries().get(1).getData());
1585
1586         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1587         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1588
1589         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1590
1591         ApplyState applyState = applyStateList.get(0);
1592         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1593         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1594         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1595                 applyState.getReplicatedLogEntry().getData());
1596
1597         applyState = applyStateList.get(1);
1598         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1599         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1601                 applyState.getReplicatedLogEntry().getData());
1602
1603         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1604         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1605         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1606     }
1607
1608     @Test
1609     public void testHandleAppendEntriesReplyWithNewerTerm() {
1610         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1611
1612         MockRaftActorContext leaderActorContext = createActorContext();
1613         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1614                 new FiniteDuration(10000, TimeUnit.SECONDS));
1615
1616         leaderActorContext.setReplicatedLog(
1617                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1618
1619         leader = new Leader(leaderActorContext);
1620         leaderActor.underlyingActor().setBehavior(leader);
1621         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1622
1623         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1624                 AppendEntriesReply.class);
1625
1626         assertEquals(false, appendEntriesReply.isSuccess());
1627         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1628
1629         MessageCollectorActor.clearMessages(leaderActor);
1630     }
1631
1632     @Test
1633     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1634         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1635
1636         MockRaftActorContext leaderActorContext = createActorContext();
1637         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1638                 new FiniteDuration(10000, TimeUnit.SECONDS));
1639
1640         leaderActorContext.setReplicatedLog(
1641                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1642         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1643
1644         leader = new Leader(leaderActorContext);
1645         leaderActor.underlyingActor().setBehavior(leader);
1646         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1647
1648         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1649                 AppendEntriesReply.class);
1650
1651         assertEquals(false, appendEntriesReply.isSuccess());
1652         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1653
1654         MessageCollectorActor.clearMessages(leaderActor);
1655     }
1656
1657     @Test
1658     public void testHandleAppendEntriesReplySuccess() throws Exception {
1659         logStart("testHandleAppendEntriesReplySuccess");
1660
1661         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1662
1663         leaderActorContext.setReplicatedLog(
1664                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1665
1666         leaderActorContext.setCommitIndex(1);
1667         leaderActorContext.setLastApplied(1);
1668         leaderActorContext.getTermInformation().update(1, "leader");
1669
1670         leader = new Leader(leaderActorContext);
1671
1672         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1673
1674         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1675         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1676
1677         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1678
1679         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1680
1681         assertEquals(RaftState.Leader, raftActorBehavior.state());
1682
1683         assertEquals(2, leaderActorContext.getCommitIndex());
1684
1685         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1686                 leaderActor, ApplyJournalEntries.class);
1687
1688         assertEquals(2, leaderActorContext.getLastApplied());
1689
1690         assertEquals(2, applyJournalEntries.getToIndex());
1691
1692         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1693                 ApplyState.class);
1694
1695         assertEquals(1,applyStateList.size());
1696
1697         ApplyState applyState = applyStateList.get(0);
1698
1699         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1700
1701         assertEquals(2, followerInfo.getMatchIndex());
1702         assertEquals(3, followerInfo.getNextIndex());
1703         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1704         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1705     }
1706
1707     @Test
1708     public void testHandleAppendEntriesReplyUnknownFollower() {
1709         logStart("testHandleAppendEntriesReplyUnknownFollower");
1710
1711         MockRaftActorContext leaderActorContext = createActorContext();
1712
1713         leader = new Leader(leaderActorContext);
1714
1715         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1716
1717         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1718
1719         assertEquals(RaftState.Leader, raftActorBehavior.state());
1720     }
1721
1722     @Test
1723     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1724         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1725
1726         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1727         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1728                 new FiniteDuration(1000, TimeUnit.SECONDS));
1729         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1730
1731         leaderActorContext.setReplicatedLog(
1732                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1733         long leaderCommitIndex = 3;
1734         leaderActorContext.setCommitIndex(leaderCommitIndex);
1735         leaderActorContext.setLastApplied(leaderCommitIndex);
1736
1737         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1738         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1739         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1740         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1741
1742         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1743
1744         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1745         followerActorContext.setCommitIndex(-1);
1746         followerActorContext.setLastApplied(-1);
1747
1748         Follower follower = new Follower(followerActorContext);
1749         followerActor.underlyingActor().setBehavior(follower);
1750         followerActorContext.setCurrentBehavior(follower);
1751
1752         leader = new Leader(leaderActorContext);
1753
1754         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1755         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1756                 AppendEntriesReply.class);
1757
1758         MessageCollectorActor.clearMessages(followerActor);
1759         MessageCollectorActor.clearMessages(leaderActor);
1760
1761         // Verify initial AppendEntries sent with the leader's current commit index.
1762         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1763         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1764         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1765
1766         leaderActor.underlyingActor().setBehavior(leader);
1767         leaderActorContext.setCurrentBehavior(leader);
1768
1769         leader.handleMessage(followerActor, appendEntriesReply);
1770
1771         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1772                 AppendEntries.class, 2);
1773         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1774
1775         appendEntries = appendEntriesList.get(0);
1776         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1777         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1778         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1779
1780         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1781         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1782                 appendEntries.getEntries().get(0).getData());
1783         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1784         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1785                 appendEntries.getEntries().get(1).getData());
1786
1787         appendEntries = appendEntriesList.get(1);
1788         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1789         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1790         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1791
1792         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1793         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1794                 appendEntries.getEntries().get(0).getData());
1795         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1796         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1797                 appendEntries.getEntries().get(1).getData());
1798
1799         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1800         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1801
1802         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1803
1804         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1805         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1806     }
1807
1808     @Test
1809     public void testHandleRequestVoteReply() {
1810         logStart("testHandleRequestVoteReply");
1811
1812         MockRaftActorContext leaderActorContext = createActorContext();
1813
1814         leader = new Leader(leaderActorContext);
1815
1816         // Should be a no-op.
1817         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1818                 new RequestVoteReply(1, true));
1819
1820         assertEquals(RaftState.Leader, raftActorBehavior.state());
1821
1822         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1823
1824         assertEquals(RaftState.Leader, raftActorBehavior.state());
1825     }
1826
1827     @Test
1828     public void testIsolatedLeaderCheckNoFollowers() {
1829         logStart("testIsolatedLeaderCheckNoFollowers");
1830
1831         MockRaftActorContext leaderActorContext = createActorContext();
1832
1833         leader = new Leader(leaderActorContext);
1834         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1835         assertTrue(newBehavior instanceof Leader);
1836     }
1837
1838     @Test
1839     public void testIsolatedLeaderCheckNoVotingFollowers() {
1840         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1841
1842         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1843         Follower follower = new Follower(followerActorContext);
1844         followerActor.underlyingActor().setBehavior(follower);
1845
1846         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1847         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1848                 new FiniteDuration(1000, TimeUnit.SECONDS));
1849         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1850
1851         leader = new Leader(leaderActorContext);
1852         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1853         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1854         assertTrue("Expected Leader", newBehavior instanceof Leader);
1855     }
1856
1857     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1858         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1859         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1860
1861         MockRaftActorContext leaderActorContext = createActorContext();
1862
1863         Map<String, String> peerAddresses = new HashMap<>();
1864         peerAddresses.put("follower-1", followerActor1.path().toString());
1865         peerAddresses.put("follower-2", followerActor2.path().toString());
1866
1867         leaderActorContext.setPeerAddresses(peerAddresses);
1868         leaderActorContext.setRaftPolicy(raftPolicy);
1869
1870         leader = new Leader(leaderActorContext);
1871
1872         leader.markFollowerActive("follower-1");
1873         leader.markFollowerActive("follower-2");
1874         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1875         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1876
1877         // kill 1 follower and verify if that got killed
1878         final JavaTestKit probe = new JavaTestKit(getSystem());
1879         probe.watch(followerActor1);
1880         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1881         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1882         assertEquals(termMsg1.getActor(), followerActor1);
1883
1884         leader.markFollowerInActive("follower-1");
1885         leader.markFollowerActive("follower-2");
1886         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1887         assertTrue("Behavior not instance of Leader when majority of followers are active",
1888                 newBehavior instanceof Leader);
1889
1890         // kill 2nd follower and leader should change to Isolated leader
1891         followerActor2.tell(PoisonPill.getInstance(), null);
1892         probe.watch(followerActor2);
1893         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1894         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1895         assertEquals(termMsg2.getActor(), followerActor2);
1896
1897         leader.markFollowerInActive("follower-2");
1898         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1899     }
1900
1901     @Test
1902     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1903         logStart("testIsolatedLeaderCheckTwoFollowers");
1904
1905         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1906
1907         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1908             newBehavior instanceof IsolatedLeader);
1909     }
1910
1911     @Test
1912     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1913         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1914
1915         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1916
1917         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1918                 newBehavior instanceof Leader);
1919     }
1920
1921     @Test
1922     public void testLaggingFollowerStarvation() throws Exception {
1923         logStart("testLaggingFollowerStarvation");
1924
1925         String leaderActorId = actorFactory.generateActorId("leader");
1926         String follower1ActorId = actorFactory.generateActorId("follower");
1927         String follower2ActorId = actorFactory.generateActorId("follower");
1928
1929         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1930         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1931
1932         MockRaftActorContext leaderActorContext =
1933                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1934
1935         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1936         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1937         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1938
1939         leaderActorContext.setConfigParams(configParams);
1940
1941         leaderActorContext.setReplicatedLog(
1942                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1943
1944         Map<String, String> peerAddresses = new HashMap<>();
1945         peerAddresses.put(follower1ActorId,
1946                 follower1Actor.path().toString());
1947         peerAddresses.put(follower2ActorId,
1948                 follower2Actor.path().toString());
1949
1950         leaderActorContext.setPeerAddresses(peerAddresses);
1951         leaderActorContext.getTermInformation().update(1, leaderActorId);
1952
1953         leader = createBehavior(leaderActorContext);
1954
1955         leaderActor.underlyingActor().setBehavior(leader);
1956
1957         for (int i = 1; i < 6; i++) {
1958             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1959             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1960                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1961             assertTrue(newBehavior == leader);
1962             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1963         }
1964
1965         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1966         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1967
1968         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1969                 heartbeats.size() > 1);
1970
1971         // Check if follower-2 got AppendEntries during this time and was not starved
1972         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1973
1974         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1975                 appendEntries.size() > 1);
1976     }
1977
1978     @Test
1979     public void testReplicationConsensusWithNonVotingFollower() {
1980         logStart("testReplicationConsensusWithNonVotingFollower");
1981
1982         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1983         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1984                 new FiniteDuration(1000, TimeUnit.SECONDS));
1985
1986         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1987         leaderActorContext.setCommitIndex(-1);
1988         leaderActorContext.setLastApplied(-1);
1989
1990         String nonVotingFollowerId = "nonvoting-follower";
1991         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1992                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1993
1994         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
1995                 VotingState.NON_VOTING);
1996
1997         leader = new Leader(leaderActorContext);
1998         leaderActorContext.setCurrentBehavior(leader);
1999
2000         // Ignore initial heartbeats
2001         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2002         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2003
2004         MessageCollectorActor.clearMessages(followerActor);
2005         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2006         MessageCollectorActor.clearMessages(leaderActor);
2007
2008         // Send a Replicate message and wait for AppendEntries.
2009         sendReplicate(leaderActorContext, 0);
2010
2011         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2013
2014         // Send reply only from the voting follower and verify consensus via ApplyState.
2015         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2016
2017         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2018
2019         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2020
2021         MessageCollectorActor.clearMessages(followerActor);
2022         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2023         MessageCollectorActor.clearMessages(leaderActor);
2024
2025         // Send another Replicate message
2026         sendReplicate(leaderActorContext, 1);
2027
2028         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2029         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2030                 AppendEntries.class);
2031         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2032         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2033
2034         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2035         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2036
2037         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2038
2039         // Send reply from the voting follower and verify consensus.
2040         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2041
2042         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2043     }
2044
2045     @Test
2046     public void testTransferLeadershipWithFollowerInSync() {
2047         logStart("testTransferLeadershipWithFollowerInSync");
2048
2049         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050         leaderActorContext.setLastApplied(-1);
2051         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052                 new FiniteDuration(1000, TimeUnit.SECONDS));
2053         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2054
2055         leader = new Leader(leaderActorContext);
2056         leaderActorContext.setCurrentBehavior(leader);
2057
2058         // Initial heartbeat
2059         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2060         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2061         MessageCollectorActor.clearMessages(followerActor);
2062
2063         sendReplicate(leaderActorContext, 0);
2064         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2065
2066         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2067         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2068         MessageCollectorActor.clearMessages(followerActor);
2069
2070         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2071         leader.transferLeadership(mockTransferCohort);
2072
2073         verify(mockTransferCohort, never()).transferComplete();
2074         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2075         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2076
2077         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2078         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2079
2080         // Leader should force an election timeout
2081         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2082
2083         verify(mockTransferCohort).transferComplete();
2084     }
2085
2086     @Test
2087     public void testTransferLeadershipWithEmptyLog() {
2088         logStart("testTransferLeadershipWithEmptyLog");
2089
2090         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2091         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2092                 new FiniteDuration(1000, TimeUnit.SECONDS));
2093         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2094
2095         leader = new Leader(leaderActorContext);
2096         leaderActorContext.setCurrentBehavior(leader);
2097
2098         // Initial heartbeat
2099         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2100         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2101         MessageCollectorActor.clearMessages(followerActor);
2102
2103         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2104         leader.transferLeadership(mockTransferCohort);
2105
2106         verify(mockTransferCohort, never()).transferComplete();
2107         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2108         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2109
2110         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2111         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2112
2113         // Leader should force an election timeout
2114         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2115
2116         verify(mockTransferCohort).transferComplete();
2117     }
2118
2119     @Test
2120     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2121         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2122
2123         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2124         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2125                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2126
2127         leader = new Leader(leaderActorContext);
2128         leaderActorContext.setCurrentBehavior(leader);
2129
2130         // Initial heartbeat
2131         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2132         MessageCollectorActor.clearMessages(followerActor);
2133
2134         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2135         leader.transferLeadership(mockTransferCohort);
2136
2137         verify(mockTransferCohort, never()).transferComplete();
2138
2139         // Sync up the follower.
2140         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2141         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2142         MessageCollectorActor.clearMessages(followerActor);
2143
2144         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2145                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2146         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2147         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2148         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2149
2150         // Leader should force an election timeout
2151         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2152
2153         verify(mockTransferCohort).transferComplete();
2154     }
2155
2156     @Test
2157     public void testTransferLeadershipWithFollowerSyncTimeout() {
2158         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2159
2160         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2161         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2162                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2163         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2164         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2165
2166         leader = new Leader(leaderActorContext);
2167         leaderActorContext.setCurrentBehavior(leader);
2168
2169         // Initial heartbeat
2170         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2171         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2172         MessageCollectorActor.clearMessages(followerActor);
2173
2174         sendReplicate(leaderActorContext, 0);
2175         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2176
2177         MessageCollectorActor.clearMessages(followerActor);
2178
2179         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2180         leader.transferLeadership(mockTransferCohort);
2181
2182         verify(mockTransferCohort, never()).transferComplete();
2183
2184         // Send heartbeats to time out the transfer.
2185         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2186             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2187                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2188             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2189         }
2190
2191         verify(mockTransferCohort).abortTransfer();
2192         verify(mockTransferCohort, never()).transferComplete();
2193         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2194     }
2195
2196     @Override
2197     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2198             ActorRef actorRef, RaftRPC rpc) throws Exception {
2199         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2200         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2201     }
2202
2203     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2204
2205         private final long electionTimeOutIntervalMillis;
2206         private final int snapshotChunkSize;
2207
2208         MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2209             super();
2210             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2211             this.snapshotChunkSize = snapshotChunkSize;
2212         }
2213
2214         @Override
2215         public FiniteDuration getElectionTimeOutInterval() {
2216             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2217         }
2218
2219         @Override
2220         public int getSnapshotChunkSize() {
2221             return snapshotChunkSize;
2222         }
2223     }
2224 }