Bug 6794 - Deprecate eventbus, async-eventbus and fixed-threadpool
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
54 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class LeaderTest extends AbstractLeaderTest<Leader> {
70
71     static final String FOLLOWER_ID = "follower";
72     public static final String LEADER_ID = "leader";
73
74     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
75             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
76
77     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
79
80     private Leader leader;
81     private final short payloadVersion = 5;
82
83     @Override
84     @After
85     public void tearDown() throws Exception {
86         if(leader != null) {
87             leader.close();
88         }
89
90         super.tearDown();
91     }
92
93     @Test
94     public void testHandleMessageForUnknownMessage() throws Exception {
95         logStart("testHandleMessageForUnknownMessage");
96
97         leader = new Leader(createActorContext());
98
99         // handle message should null when it receives an unknown message
100         assertNull(leader.handleMessage(followerActor, "foo"));
101     }
102
103     @Test
104     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106
107         MockRaftActorContext actorContext = createActorContextWithFollower();
108         actorContext.setCommitIndex(-1);
109         short payloadVersion = (short)5;
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116         actorContext.setCurrentBehavior(leader);
117
118         // Leader should send an immediate heartbeat with no entries as follower is inactive.
119         long lastIndex = actorContext.getReplicatedLog().lastIndex();
120         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getTerm", term, appendEntries.getTerm());
122         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 0, appendEntries.getEntries().size());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126
127         // The follower would normally reply - simulate that explicitly here.
128         leader.handleMessage(followerActor, new AppendEntriesReply(
129                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131
132         followerActor.underlyingActor().clear();
133
134         // Sleep for the heartbeat interval so AppendEntries is sent.
135         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137
138         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139
140         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143         assertEquals("Entries size", 1, appendEntries.getEntries().size());
144         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147     }
148
149
150     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151         return sendReplicate(actorContext, 1, index);
152     }
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157                 term, index, payload);
158         actorContext.getReplicatedLog().append(newEntry);
159         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
160     }
161
162     @Test
163     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165
166         MockRaftActorContext actorContext = createActorContextWithFollower();
167
168         long term = 1;
169         actorContext.getTermInformation().update(term, "");
170
171         leader = new Leader(actorContext);
172
173         // Leader will send an immediate heartbeat - ignore it.
174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175
176         // The follower would normally reply - simulate that explicitly here.
177         long lastIndex = actorContext.getReplicatedLog().lastIndex();
178         leader.handleMessage(followerActor, new AppendEntriesReply(
179                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181
182         followerActor.underlyingActor().clear();
183
184         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185
186         // State should not change
187         assertTrue(raftBehavior instanceof Leader);
188
189         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192         assertEquals("Entries size", 1, appendEntries.getEntries().size());
193         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
197     }
198
199     @Test
200     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202
203         MockRaftActorContext actorContext = createActorContextWithFollower();
204         actorContext.setCommitIndex(-1);
205
206         // The raft context is initialized with a couple log entries. However the commitIndex
207         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
208         // committed and applied. Now it regains leadership with a higher term (2).
209         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
210         long newTerm = prevTerm + 1;
211         actorContext.getTermInformation().update(newTerm, "");
212
213         leader = new Leader(actorContext);
214         actorContext.setCurrentBehavior(leader);
215
216         // Leader will send an immediate heartbeat - ignore it.
217         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
218
219         // The follower replies with the leader's current last index and term, simulating that it is
220         // up to date with the leader.
221         long lastIndex = actorContext.getReplicatedLog().lastIndex();
222         leader.handleMessage(followerActor, new AppendEntriesReply(
223                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
224
225         // The commit index should not get updated even though consensus was reached. This is b/c the
226         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
227         // from previous terms by counting replicas".
228         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
229
230         followerActor.underlyingActor().clear();
231
232         // Now replicate a new entry with the new term 2.
233         long newIndex = lastIndex + 1;
234         sendReplicate(actorContext, newTerm, newIndex);
235
236         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
237         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
238         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
239         assertEquals("Entries size", 1, appendEntries.getEntries().size());
240         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
241         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
242         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
243
244         // The follower replies with success. The leader should now update the commit index to the new index
245         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
246         // prior entries are committed indirectly".
247         leader.handleMessage(followerActor, new AppendEntriesReply(
248                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
249
250         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
251     }
252
253     @Test
254     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
255         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
256
257         MockRaftActorContext actorContext = createActorContextWithFollower();
258         actorContext.setRaftPolicy(createRaftPolicy(true, true));
259
260         long term = 1;
261         actorContext.getTermInformation().update(term, "");
262
263         leader = new Leader(actorContext);
264
265         // Leader will send an immediate heartbeat - ignore it.
266         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
267
268         // The follower would normally reply - simulate that explicitly here.
269         long lastIndex = actorContext.getReplicatedLog().lastIndex();
270         leader.handleMessage(followerActor, new AppendEntriesReply(
271                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
272         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
273
274         followerActor.underlyingActor().clear();
275
276         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
277
278         // State should not change
279         assertTrue(raftBehavior instanceof Leader);
280
281         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
283         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
284         assertEquals("Entries size", 1, appendEntries.getEntries().size());
285         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
286         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
287         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
288         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
289     }
290
291     @Test
292     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
293         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
294
295         MockRaftActorContext actorContext = createActorContextWithFollower();
296         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
297             @Override
298             public FiniteDuration getHeartBeatInterval() {
299                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
300             }
301         });
302
303         long term = 1;
304         actorContext.getTermInformation().update(term, "");
305
306         leader = new Leader(actorContext);
307
308         // Leader will send an immediate heartbeat - ignore it.
309         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
310
311         // The follower would normally reply - simulate that explicitly here.
312         long lastIndex = actorContext.getReplicatedLog().lastIndex();
313         leader.handleMessage(followerActor, new AppendEntriesReply(
314                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
315         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
316
317         followerActor.underlyingActor().clear();
318
319         for(int i=0;i<5;i++) {
320             sendReplicate(actorContext, lastIndex+i+1);
321         }
322
323         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
324         // We expect only 1 message to be sent because of two reasons,
325         // - an append entries reply was not received
326         // - the heartbeat interval has not expired
327         // In this scenario if multiple messages are sent they would likely be duplicates
328         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
329     }
330
331     @Test
332     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
333         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
334
335         MockRaftActorContext actorContext = createActorContextWithFollower();
336         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
337             @Override
338             public FiniteDuration getHeartBeatInterval() {
339                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
340             }
341         });
342
343         long term = 1;
344         actorContext.getTermInformation().update(term, "");
345
346         leader = new Leader(actorContext);
347
348         // Leader will send an immediate heartbeat - ignore it.
349         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
350
351         // The follower would normally reply - simulate that explicitly here.
352         long lastIndex = actorContext.getReplicatedLog().lastIndex();
353         leader.handleMessage(followerActor, new AppendEntriesReply(
354                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
355         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
356
357         followerActor.underlyingActor().clear();
358
359         for(int i=0;i<3;i++) {
360             sendReplicate(actorContext, lastIndex+i+1);
361             leader.handleMessage(followerActor, new AppendEntriesReply(
362                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
363
364         }
365
366         for(int i=3;i<5;i++) {
367             sendReplicate(actorContext, lastIndex + i + 1);
368         }
369
370         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
371         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
372         // get sent to the follower - but not the 5th
373         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
374
375         for(int i=0;i<4;i++) {
376             long expected = allMessages.get(i).getEntries().get(0).getIndex();
377             assertEquals(expected, i+2);
378         }
379     }
380
381     @Test
382     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
383         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
384
385         MockRaftActorContext actorContext = createActorContextWithFollower();
386         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
387             @Override
388             public FiniteDuration getHeartBeatInterval() {
389                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
390             }
391         });
392
393         long term = 1;
394         actorContext.getTermInformation().update(term, "");
395
396         leader = new Leader(actorContext);
397
398         // Leader will send an immediate heartbeat - ignore it.
399         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
400
401         // The follower would normally reply - simulate that explicitly here.
402         long lastIndex = actorContext.getReplicatedLog().lastIndex();
403         leader.handleMessage(followerActor, new AppendEntriesReply(
404                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
405         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
406
407         followerActor.underlyingActor().clear();
408
409         sendReplicate(actorContext, lastIndex+1);
410
411         // Wait slightly longer than heartbeat duration
412         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
413
414         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
415
416         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
417         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
418
419         assertEquals(1, allMessages.get(0).getEntries().size());
420         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
421         assertEquals(1, allMessages.get(1).getEntries().size());
422         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
423
424     }
425
426     @Test
427     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
428         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
432             @Override
433             public FiniteDuration getHeartBeatInterval() {
434                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
435             }
436         });
437
438         long term = 1;
439         actorContext.getTermInformation().update(term, "");
440
441         leader = new Leader(actorContext);
442
443         // Leader will send an immediate heartbeat - ignore it.
444         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
445
446         // The follower would normally reply - simulate that explicitly here.
447         long lastIndex = actorContext.getReplicatedLog().lastIndex();
448         leader.handleMessage(followerActor, new AppendEntriesReply(
449                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
450         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
451
452         followerActor.underlyingActor().clear();
453
454         for(int i=0;i<3;i++) {
455             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
456             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
457         }
458
459         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
460         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
461     }
462
463     @Test
464     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
465         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
466
467         MockRaftActorContext actorContext = createActorContextWithFollower();
468         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
469             @Override
470             public FiniteDuration getHeartBeatInterval() {
471                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
472             }
473         });
474
475         long term = 1;
476         actorContext.getTermInformation().update(term, "");
477
478         leader = new Leader(actorContext);
479
480         // Leader will send an immediate heartbeat - ignore it.
481         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482
483         // The follower would normally reply - simulate that explicitly here.
484         long lastIndex = actorContext.getReplicatedLog().lastIndex();
485         leader.handleMessage(followerActor, new AppendEntriesReply(
486                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
487         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
488
489         followerActor.underlyingActor().clear();
490
491         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493         sendReplicate(actorContext, lastIndex+1);
494
495         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
497
498         assertEquals(0, allMessages.get(0).getEntries().size());
499         assertEquals(1, allMessages.get(1).getEntries().size());
500     }
501
502
503     @Test
504     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
505         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
506
507         MockRaftActorContext actorContext = createActorContext();
508
509         leader = new Leader(actorContext);
510
511         actorContext.setLastApplied(0);
512
513         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
514         long term = actorContext.getTermInformation().getCurrentTerm();
515         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
516                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
517
518         actorContext.getReplicatedLog().append(newEntry);
519
520         final Identifier id = new MockIdentifier("state-id");
521         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
522
523         // State should not change
524         assertTrue(raftBehavior instanceof Leader);
525
526         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
527
528         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
529         // one since lastApplied state is 0.
530         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
531                 leaderActor, ApplyState.class);
532         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
533
534         for(int i = 0; i <= newLogIndex - 1; i++ ) {
535             ApplyState applyState = applyStateList.get(i);
536             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
537             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
538         }
539
540         ApplyState last = applyStateList.get((int) newLogIndex - 1);
541         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
542         assertEquals("getIdentifier", id, last.getIdentifier());
543     }
544
545     @Test
546     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
547         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
548
549         MockRaftActorContext actorContext = createActorContextWithFollower();
550
551         Map<String, String> leadersSnapshot = new HashMap<>();
552         leadersSnapshot.put("1", "A");
553         leadersSnapshot.put("2", "B");
554         leadersSnapshot.put("3", "C");
555
556         //clears leaders log
557         actorContext.getReplicatedLog().removeFrom(0);
558
559         final int commitIndex = 3;
560         final int snapshotIndex = 2;
561         final int newEntryIndex = 4;
562         final int snapshotTerm = 1;
563         final int currentTerm = 2;
564
565         // set the snapshot variables in replicatedlog
566         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
567         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
568         actorContext.setCommitIndex(commitIndex);
569         //set follower timeout to 2 mins, helps during debugging
570         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
571
572         leader = new Leader(actorContext);
573
574         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
575         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
576
577         // new entry
578         ReplicatedLogImplEntry entry =
579                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580                         new MockRaftActorContext.MockPayload("D"));
581
582         //update follower timestamp
583         leader.markFollowerActive(FOLLOWER_ID);
584
585         ByteString bs = toByteString(leadersSnapshot);
586         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
587                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
588         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
589         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
590
591         //send first chunk and no InstallSnapshotReply received yet
592         fts.getNextChunk();
593         fts.incrementChunkIndex();
594
595         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
596                 TimeUnit.MILLISECONDS);
597
598         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
599
600         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
601
602         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
603
604         //InstallSnapshotReply received
605         fts.markSendStatus(true);
606
607         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
608
609         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
610
611         assertEquals(commitIndex, is.getLastIncludedIndex());
612     }
613
614     @Test
615     public void testSendAppendEntriesSnapshotScenario() throws Exception {
616         logStart("testSendAppendEntriesSnapshotScenario");
617
618         MockRaftActorContext actorContext = createActorContextWithFollower();
619
620         Map<String, String> leadersSnapshot = new HashMap<>();
621         leadersSnapshot.put("1", "A");
622         leadersSnapshot.put("2", "B");
623         leadersSnapshot.put("3", "C");
624
625         //clears leaders log
626         actorContext.getReplicatedLog().removeFrom(0);
627
628         final int followersLastIndex = 2;
629         final int snapshotIndex = 3;
630         final int newEntryIndex = 4;
631         final int snapshotTerm = 1;
632         final int currentTerm = 2;
633
634         // set the snapshot variables in replicatedlog
635         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
636         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
637         actorContext.setCommitIndex(followersLastIndex);
638
639         leader = new Leader(actorContext);
640
641         // Leader will send an immediate heartbeat - ignore it.
642         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
643
644         // new entry
645         ReplicatedLogImplEntry entry =
646                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
647                         new MockRaftActorContext.MockPayload("D"));
648
649         actorContext.getReplicatedLog().append(entry);
650
651         //update follower timestamp
652         leader.markFollowerActive(FOLLOWER_ID);
653
654         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
655         RaftActorBehavior raftBehavior = leader.handleMessage(
656                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
657
658         assertTrue(raftBehavior instanceof Leader);
659
660         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
661     }
662
663     @Test
664     public void testInitiateInstallSnapshot() throws Exception {
665         logStart("testInitiateInstallSnapshot");
666
667         MockRaftActorContext actorContext = createActorContextWithFollower();
668
669         //clears leaders log
670         actorContext.getReplicatedLog().removeFrom(0);
671
672         final int followersLastIndex = 2;
673         final int snapshotIndex = 3;
674         final int newEntryIndex = 4;
675         final int snapshotTerm = 1;
676         final int currentTerm = 2;
677
678         // set the snapshot variables in replicatedlog
679         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681         actorContext.setLastApplied(3);
682         actorContext.setCommitIndex(followersLastIndex);
683
684         leader = new Leader(actorContext);
685
686         // Leader will send an immediate heartbeat - ignore it.
687         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
688
689         // set the snapshot as absent and check if capture-snapshot is invoked.
690         leader.setSnapshot(null);
691
692         // new entry
693         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
694                 new MockRaftActorContext.MockPayload("D"));
695
696         actorContext.getReplicatedLog().append(entry);
697
698         //update follower timestamp
699         leader.markFollowerActive(FOLLOWER_ID);
700
701         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
702
703         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
704
705         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
706
707         assertTrue(cs.isInstallSnapshotInitiated());
708         assertEquals(3, cs.getLastAppliedIndex());
709         assertEquals(1, cs.getLastAppliedTerm());
710         assertEquals(4, cs.getLastIndex());
711         assertEquals(2, cs.getLastTerm());
712
713         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
714         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
715
716         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
717     }
718
719     @Test
720     public void testInitiateForceInstallSnapshot() throws Exception {
721         logStart("testInitiateForceInstallSnapshot");
722
723         MockRaftActorContext actorContext = createActorContextWithFollower();
724
725         final int followersLastIndex = 2;
726         final int snapshotIndex = -1;
727         final int newEntryIndex = 4;
728         final int snapshotTerm = -1;
729         final int currentTerm = 2;
730
731         // set the snapshot variables in replicatedlog
732         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734         actorContext.setLastApplied(3);
735         actorContext.setCommitIndex(followersLastIndex);
736
737         actorContext.getReplicatedLog().removeFrom(0);
738
739         leader = new Leader(actorContext);
740
741         // Leader will send an immediate heartbeat - ignore it.
742         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
743
744         // set the snapshot as absent and check if capture-snapshot is invoked.
745         leader.setSnapshot(null);
746
747         for(int i=0;i<4;i++) {
748             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
749                     new MockRaftActorContext.MockPayload("X" + i)));
750         }
751
752         // new entry
753         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
754                 new MockRaftActorContext.MockPayload("D"));
755
756         actorContext.getReplicatedLog().append(entry);
757
758         //update follower timestamp
759         leader.markFollowerActive(FOLLOWER_ID);
760
761         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
762         // installed with a SendInstallSnapshot
763         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
764
765         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
766
767         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
768
769         assertTrue(cs.isInstallSnapshotInitiated());
770         assertEquals(3, cs.getLastAppliedIndex());
771         assertEquals(1, cs.getLastAppliedTerm());
772         assertEquals(4, cs.getLastIndex());
773         assertEquals(2, cs.getLastTerm());
774
775         // if an initiate is started again when first is in progress, it should not initiate Capture
776         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
777
778         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
779     }
780
781
782     @Test
783     public void testInstallSnapshot() throws Exception {
784         logStart("testInstallSnapshot");
785
786         MockRaftActorContext actorContext = createActorContextWithFollower();
787
788         Map<String, String> leadersSnapshot = new HashMap<>();
789         leadersSnapshot.put("1", "A");
790         leadersSnapshot.put("2", "B");
791         leadersSnapshot.put("3", "C");
792
793         //clears leaders log
794         actorContext.getReplicatedLog().removeFrom(0);
795
796         final int lastAppliedIndex = 3;
797         final int snapshotIndex = 2;
798         final int snapshotTerm = 1;
799         final int currentTerm = 2;
800
801         // set the snapshot variables in replicatedlog
802         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805         actorContext.setCommitIndex(lastAppliedIndex);
806         actorContext.setLastApplied(lastAppliedIndex);
807
808         leader = new Leader(actorContext);
809
810         // Initial heartbeat.
811         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
812
813         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
814         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
815
816         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
817                 Collections.<ReplicatedLogEntry>emptyList(),
818                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
819
820         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
821
822         assertTrue(raftBehavior instanceof Leader);
823
824         // check if installsnapshot gets called with the correct values.
825
826         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
827
828         assertNotNull(installSnapshot.getData());
829         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
830         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
831
832         assertEquals(currentTerm, installSnapshot.getTerm());
833     }
834
835     @Test
836     public void testForceInstallSnapshot() throws Exception {
837         logStart("testForceInstallSnapshot");
838
839         MockRaftActorContext actorContext = createActorContextWithFollower();
840
841         Map<String, String> leadersSnapshot = new HashMap<>();
842         leadersSnapshot.put("1", "A");
843         leadersSnapshot.put("2", "B");
844         leadersSnapshot.put("3", "C");
845
846         final int lastAppliedIndex = 3;
847         final int snapshotIndex = -1;
848         final int snapshotTerm = -1;
849         final int currentTerm = 2;
850
851         // set the snapshot variables in replicatedlog
852         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
853         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
854         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
855         actorContext.setCommitIndex(lastAppliedIndex);
856         actorContext.setLastApplied(lastAppliedIndex);
857
858         leader = new Leader(actorContext);
859
860         // Initial heartbeat.
861         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
862
863         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
864         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
865
866         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
867                 Collections.<ReplicatedLogEntry>emptyList(),
868                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
869
870         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
871
872         assertTrue(raftBehavior instanceof Leader);
873
874         // check if installsnapshot gets called with the correct values.
875
876         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
877
878         assertNotNull(installSnapshot.getData());
879         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
880         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
881
882         assertEquals(currentTerm, installSnapshot.getTerm());
883     }
884
885     @Test
886     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
887         logStart("testHandleInstallSnapshotReplyLastChunk");
888
889         MockRaftActorContext actorContext = createActorContextWithFollower();
890
891         final int commitIndex = 3;
892         final int snapshotIndex = 2;
893         final int snapshotTerm = 1;
894         final int currentTerm = 2;
895
896         actorContext.setCommitIndex(commitIndex);
897
898         leader = new Leader(actorContext);
899         actorContext.setCurrentBehavior(leader);
900
901         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
902         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
903
904         // Ignore initial heartbeat.
905         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
906
907         Map<String, String> leadersSnapshot = new HashMap<>();
908         leadersSnapshot.put("1", "A");
909         leadersSnapshot.put("2", "B");
910         leadersSnapshot.put("3", "C");
911
912         // set the snapshot variables in replicatedlog
913
914         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
915         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
916         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917
918         ByteString bs = toByteString(leadersSnapshot);
919         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
920                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
921         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
922         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
923         while(!fts.isLastChunk(fts.getChunkIndex())) {
924             fts.getNextChunk();
925             fts.incrementChunkIndex();
926         }
927
928         //clears leaders log
929         actorContext.getReplicatedLog().removeFrom(0);
930
931         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
932                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
933
934         assertTrue(raftBehavior instanceof Leader);
935
936         assertEquals(0, leader.followerSnapshotSize());
937         assertEquals(1, leader.followerLogSize());
938         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
939         assertNotNull(fli);
940         assertEquals(commitIndex, fli.getMatchIndex());
941         assertEquals(commitIndex + 1, fli.getNextIndex());
942     }
943
944     @Test
945     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
946         logStart("testSendSnapshotfromInstallSnapshotReply");
947
948         MockRaftActorContext actorContext = createActorContextWithFollower();
949
950         final int commitIndex = 3;
951         final int snapshotIndex = 2;
952         final int snapshotTerm = 1;
953         final int currentTerm = 2;
954
955         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
956             @Override
957             public int getSnapshotChunkSize() {
958                 return 50;
959             }
960         };
961         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
962         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
963
964         actorContext.setConfigParams(configParams);
965         actorContext.setCommitIndex(commitIndex);
966
967         leader = new Leader(actorContext);
968         actorContext.setCurrentBehavior(leader);
969
970         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
971         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
972
973         Map<String, String> leadersSnapshot = new HashMap<>();
974         leadersSnapshot.put("1", "A");
975         leadersSnapshot.put("2", "B");
976         leadersSnapshot.put("3", "C");
977
978         // set the snapshot variables in replicatedlog
979         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
980         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
981         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
982
983         ByteString bs = toByteString(leadersSnapshot);
984         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
985                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
986         leader.setSnapshot(snapshot);
987
988         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
989
990         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
991
992         assertEquals(1, installSnapshot.getChunkIndex());
993         assertEquals(3, installSnapshot.getTotalChunks());
994
995         followerActor.underlyingActor().clear();
996         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
997                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
998
999         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1000
1001         assertEquals(2, installSnapshot.getChunkIndex());
1002         assertEquals(3, installSnapshot.getTotalChunks());
1003
1004         followerActor.underlyingActor().clear();
1005         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1006                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1007
1008         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1009
1010         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1011         followerActor.underlyingActor().clear();
1012         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1013                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1014
1015         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1016
1017         assertNull(installSnapshot);
1018     }
1019
1020
1021     @Test
1022     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1023         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1024
1025         MockRaftActorContext actorContext = createActorContextWithFollower();
1026
1027         final int commitIndex = 3;
1028         final int snapshotIndex = 2;
1029         final int snapshotTerm = 1;
1030         final int currentTerm = 2;
1031
1032         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1033             @Override
1034             public int getSnapshotChunkSize() {
1035                 return 50;
1036             }
1037         });
1038
1039         actorContext.setCommitIndex(commitIndex);
1040
1041         leader = new Leader(actorContext);
1042
1043         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1044         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1045
1046         Map<String, String> leadersSnapshot = new HashMap<>();
1047         leadersSnapshot.put("1", "A");
1048         leadersSnapshot.put("2", "B");
1049         leadersSnapshot.put("3", "C");
1050
1051         // set the snapshot variables in replicatedlog
1052         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1053         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1054         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1055
1056         ByteString bs = toByteString(leadersSnapshot);
1057         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1058                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1059         leader.setSnapshot(snapshot);
1060
1061         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1062         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1063
1064         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1065
1066         assertEquals(1, installSnapshot.getChunkIndex());
1067         assertEquals(3, installSnapshot.getTotalChunks());
1068
1069         followerActor.underlyingActor().clear();
1070
1071         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1072                 FOLLOWER_ID, -1, false));
1073
1074         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1075                 TimeUnit.MILLISECONDS);
1076
1077         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1078
1079         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1080
1081         assertEquals(1, installSnapshot.getChunkIndex());
1082         assertEquals(3, installSnapshot.getTotalChunks());
1083     }
1084
1085     @Test
1086     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1087         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1088
1089         MockRaftActorContext actorContext = createActorContextWithFollower();
1090
1091         final int commitIndex = 3;
1092         final int snapshotIndex = 2;
1093         final int snapshotTerm = 1;
1094         final int currentTerm = 2;
1095
1096         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1097             @Override
1098             public int getSnapshotChunkSize() {
1099                 return 50;
1100             }
1101         });
1102
1103         actorContext.setCommitIndex(commitIndex);
1104
1105         leader = new Leader(actorContext);
1106
1107         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1108         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1109
1110         Map<String, String> leadersSnapshot = new HashMap<>();
1111         leadersSnapshot.put("1", "A");
1112         leadersSnapshot.put("2", "B");
1113         leadersSnapshot.put("3", "C");
1114
1115         // set the snapshot variables in replicatedlog
1116         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1117         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1118         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1119
1120         ByteString bs = toByteString(leadersSnapshot);
1121         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1122                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1123         leader.setSnapshot(snapshot);
1124
1125         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1126
1127         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1128
1129         assertEquals(1, installSnapshot.getChunkIndex());
1130         assertEquals(3, installSnapshot.getTotalChunks());
1131         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1132
1133         int hashCode = Arrays.hashCode(installSnapshot.getData());
1134
1135         followerActor.underlyingActor().clear();
1136
1137         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1138                 FOLLOWER_ID, 1, true));
1139
1140         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1141
1142         assertEquals(2, installSnapshot.getChunkIndex());
1143         assertEquals(3, installSnapshot.getTotalChunks());
1144         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1145     }
1146
1147     @Test
1148     public void testFollowerToSnapshotLogic() {
1149         logStart("testFollowerToSnapshotLogic");
1150
1151         MockRaftActorContext actorContext = createActorContext();
1152
1153         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1154             @Override
1155             public int getSnapshotChunkSize() {
1156                 return 50;
1157             }
1158         });
1159
1160         leader = new Leader(actorContext);
1161
1162         Map<String, String> leadersSnapshot = new HashMap<>();
1163         leadersSnapshot.put("1", "A");
1164         leadersSnapshot.put("2", "B");
1165         leadersSnapshot.put("3", "C");
1166
1167         ByteString bs = toByteString(leadersSnapshot);
1168         byte[] barray = bs.toByteArray();
1169
1170         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1171         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1172
1173         assertEquals(bs.size(), barray.length);
1174
1175         int chunkIndex=0;
1176         for (int i=0; i < barray.length; i = i + 50) {
1177             int j = i + 50;
1178             chunkIndex++;
1179
1180             if (i + 50 > barray.length) {
1181                 j = barray.length;
1182             }
1183
1184             byte[] chunk = fts.getNextChunk();
1185             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1186             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1187
1188             fts.markSendStatus(true);
1189             if (!fts.isLastChunk(chunkIndex)) {
1190                 fts.incrementChunkIndex();
1191             }
1192         }
1193
1194         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1195     }
1196
1197     @Override
1198     protected Leader createBehavior(final RaftActorContext actorContext) {
1199         return new Leader(actorContext);
1200     }
1201
1202     @Override
1203     protected MockRaftActorContext createActorContext() {
1204         return createActorContext(leaderActor);
1205     }
1206
1207     @Override
1208     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1209         return createActorContext(LEADER_ID, actorRef);
1210     }
1211
1212     private MockRaftActorContext createActorContextWithFollower() {
1213         MockRaftActorContext actorContext = createActorContext();
1214         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1215                 followerActor.path().toString()).build());
1216         return actorContext;
1217     }
1218
1219     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1220         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1221         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1222         configParams.setElectionTimeoutFactor(100000);
1223         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1224         context.setConfigParams(configParams);
1225         context.setPayloadVersion(payloadVersion);
1226         return context;
1227     }
1228
1229     private MockRaftActorContext createFollowerActorContextWithLeader() {
1230         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1231         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1232         followerConfig.setElectionTimeoutFactor(10000);
1233         followerActorContext.setConfigParams(followerConfig);
1234         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1235         return followerActorContext;
1236     }
1237
1238     @Test
1239     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1240         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1241
1242         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1243
1244         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1245
1246         Follower follower = new Follower(followerActorContext);
1247         followerActor.underlyingActor().setBehavior(follower);
1248         followerActorContext.setCurrentBehavior(follower);
1249
1250         Map<String, String> peerAddresses = new HashMap<>();
1251         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1252
1253         leaderActorContext.setPeerAddresses(peerAddresses);
1254
1255         leaderActorContext.getReplicatedLog().removeFrom(0);
1256
1257         //create 3 entries
1258         leaderActorContext.setReplicatedLog(
1259                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1260
1261         leaderActorContext.setCommitIndex(1);
1262
1263         followerActorContext.getReplicatedLog().removeFrom(0);
1264
1265         // follower too has the exact same log entries and has the same commit index
1266         followerActorContext.setReplicatedLog(
1267                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1268
1269         followerActorContext.setCommitIndex(1);
1270
1271         leader = new Leader(leaderActorContext);
1272         leaderActorContext.setCurrentBehavior(leader);
1273
1274         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1275
1276         assertEquals(1, appendEntries.getLeaderCommit());
1277         assertEquals(0, appendEntries.getEntries().size());
1278         assertEquals(0, appendEntries.getPrevLogIndex());
1279
1280         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1281                 leaderActor, AppendEntriesReply.class);
1282
1283         assertEquals(2, appendEntriesReply.getLogLastIndex());
1284         assertEquals(1, appendEntriesReply.getLogLastTerm());
1285
1286         // follower returns its next index
1287         assertEquals(2, appendEntriesReply.getLogLastIndex());
1288         assertEquals(1, appendEntriesReply.getLogLastTerm());
1289
1290         follower.close();
1291     }
1292
1293     @Test
1294     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1295         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1296
1297         MockRaftActorContext leaderActorContext = createActorContext();
1298
1299         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1300         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1301
1302         Follower follower = new Follower(followerActorContext);
1303         followerActor.underlyingActor().setBehavior(follower);
1304         followerActorContext.setCurrentBehavior(follower);
1305
1306         Map<String, String> leaderPeerAddresses = new HashMap<>();
1307         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1308
1309         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1310
1311         leaderActorContext.getReplicatedLog().removeFrom(0);
1312
1313         leaderActorContext.setReplicatedLog(
1314                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315
1316         leaderActorContext.setCommitIndex(1);
1317
1318         followerActorContext.getReplicatedLog().removeFrom(0);
1319
1320         followerActorContext.setReplicatedLog(
1321                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1322
1323         // follower has the same log entries but its commit index > leaders commit index
1324         followerActorContext.setCommitIndex(2);
1325
1326         leader = new Leader(leaderActorContext);
1327
1328         // Initial heartbeat
1329         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1330
1331         assertEquals(1, appendEntries.getLeaderCommit());
1332         assertEquals(0, appendEntries.getEntries().size());
1333         assertEquals(0, appendEntries.getPrevLogIndex());
1334
1335         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1336                 leaderActor, AppendEntriesReply.class);
1337
1338         assertEquals(2, appendEntriesReply.getLogLastIndex());
1339         assertEquals(1, appendEntriesReply.getLogLastTerm());
1340
1341         leaderActor.underlyingActor().setBehavior(follower);
1342         leader.handleMessage(followerActor, appendEntriesReply);
1343
1344         leaderActor.underlyingActor().clear();
1345         followerActor.underlyingActor().clear();
1346
1347         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1348                 TimeUnit.MILLISECONDS);
1349
1350         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1351
1352         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1353
1354         assertEquals(2, appendEntries.getLeaderCommit());
1355         assertEquals(0, appendEntries.getEntries().size());
1356         assertEquals(2, appendEntries.getPrevLogIndex());
1357
1358         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1359
1360         assertEquals(2, appendEntriesReply.getLogLastIndex());
1361         assertEquals(1, appendEntriesReply.getLogLastTerm());
1362
1363         assertEquals(2, followerActorContext.getCommitIndex());
1364
1365         follower.close();
1366     }
1367
1368     @Test
1369     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1370         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1371
1372         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1373         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1374                 new FiniteDuration(1000, TimeUnit.SECONDS));
1375
1376         leaderActorContext.setReplicatedLog(
1377                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1378         long leaderCommitIndex = 2;
1379         leaderActorContext.setCommitIndex(leaderCommitIndex);
1380         leaderActorContext.setLastApplied(leaderCommitIndex);
1381
1382         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1383         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1384
1385         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1386
1387         followerActorContext.setReplicatedLog(
1388                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1389         followerActorContext.setCommitIndex(0);
1390         followerActorContext.setLastApplied(0);
1391
1392         Follower follower = new Follower(followerActorContext);
1393         followerActor.underlyingActor().setBehavior(follower);
1394
1395         leader = new Leader(leaderActorContext);
1396
1397         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1398         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1399
1400         MessageCollectorActor.clearMessages(followerActor);
1401         MessageCollectorActor.clearMessages(leaderActor);
1402
1403         // Verify initial AppendEntries sent with the leader's current commit index.
1404         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1405         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1406         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1407
1408         leaderActor.underlyingActor().setBehavior(leader);
1409
1410         leader.handleMessage(followerActor, appendEntriesReply);
1411
1412         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1413         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1414
1415         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1416         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1417         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1418
1419         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1420         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1421                 appendEntries.getEntries().get(0).getData());
1422         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1423         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1424                 appendEntries.getEntries().get(1).getData());
1425
1426         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1427         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1428
1429         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1430
1431         ApplyState applyState = applyStateList.get(0);
1432         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1433         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1434         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1435                 applyState.getReplicatedLogEntry().getData());
1436
1437         applyState = applyStateList.get(1);
1438         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1439         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1440         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1441                 applyState.getReplicatedLogEntry().getData());
1442
1443         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1444         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1445     }
1446
1447     @Test
1448     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1449         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1450
1451         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1452         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1453                 new FiniteDuration(1000, TimeUnit.SECONDS));
1454
1455         leaderActorContext.setReplicatedLog(
1456                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1457         long leaderCommitIndex = 1;
1458         leaderActorContext.setCommitIndex(leaderCommitIndex);
1459         leaderActorContext.setLastApplied(leaderCommitIndex);
1460
1461         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1462         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1463
1464         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1465
1466         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1467         followerActorContext.setCommitIndex(-1);
1468         followerActorContext.setLastApplied(-1);
1469
1470         Follower follower = new Follower(followerActorContext);
1471         followerActor.underlyingActor().setBehavior(follower);
1472         followerActorContext.setCurrentBehavior(follower);
1473
1474         leader = new Leader(leaderActorContext);
1475
1476         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1477         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1478
1479         MessageCollectorActor.clearMessages(followerActor);
1480         MessageCollectorActor.clearMessages(leaderActor);
1481
1482         // Verify initial AppendEntries sent with the leader's current commit index.
1483         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1484         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1485         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1486
1487         leaderActor.underlyingActor().setBehavior(leader);
1488         leaderActorContext.setCurrentBehavior(leader);
1489
1490         leader.handleMessage(followerActor, appendEntriesReply);
1491
1492         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1493         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1494
1495         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1496         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1497         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1498
1499         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1500         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1501                 appendEntries.getEntries().get(0).getData());
1502         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1503         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1504                 appendEntries.getEntries().get(1).getData());
1505
1506         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1507         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1508
1509         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1510
1511         ApplyState applyState = applyStateList.get(0);
1512         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1513         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1514         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1515                 applyState.getReplicatedLogEntry().getData());
1516
1517         applyState = applyStateList.get(1);
1518         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1519         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1520         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1521                 applyState.getReplicatedLogEntry().getData());
1522
1523         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1524         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1525     }
1526
1527     @Test
1528     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1529         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1530
1531         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1532         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1533                 new FiniteDuration(1000, TimeUnit.SECONDS));
1534
1535         leaderActorContext.setReplicatedLog(
1536                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1537         long leaderCommitIndex = 1;
1538         leaderActorContext.setCommitIndex(leaderCommitIndex);
1539         leaderActorContext.setLastApplied(leaderCommitIndex);
1540
1541         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1542         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1543
1544         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1545
1546         followerActorContext.setReplicatedLog(
1547                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1548         followerActorContext.setCommitIndex(-1);
1549         followerActorContext.setLastApplied(-1);
1550
1551         Follower follower = new Follower(followerActorContext);
1552         followerActor.underlyingActor().setBehavior(follower);
1553         followerActorContext.setCurrentBehavior(follower);
1554
1555         leader = new Leader(leaderActorContext);
1556
1557         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1558         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1559
1560         MessageCollectorActor.clearMessages(followerActor);
1561         MessageCollectorActor.clearMessages(leaderActor);
1562
1563         // Verify initial AppendEntries sent with the leader's current commit index.
1564         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1565         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1566         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1567
1568         leaderActor.underlyingActor().setBehavior(leader);
1569         leaderActorContext.setCurrentBehavior(leader);
1570
1571         leader.handleMessage(followerActor, appendEntriesReply);
1572
1573         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1574         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1575
1576         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1577         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1578         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1579
1580         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1581         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1582         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1583                 appendEntries.getEntries().get(0).getData());
1584         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1585         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1586         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1587                 appendEntries.getEntries().get(1).getData());
1588
1589         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1590         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1591
1592         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1593
1594         ApplyState applyState = applyStateList.get(0);
1595         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1596         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1597         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1598                 applyState.getReplicatedLogEntry().getData());
1599
1600         applyState = applyStateList.get(1);
1601         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1602         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1603         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1604                 applyState.getReplicatedLogEntry().getData());
1605
1606         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1607         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1608         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1609     }
1610
1611     @Test
1612     public void testHandleAppendEntriesReplyWithNewerTerm(){
1613         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1614
1615         MockRaftActorContext leaderActorContext = createActorContext();
1616         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1617                 new FiniteDuration(10000, TimeUnit.SECONDS));
1618
1619         leaderActorContext.setReplicatedLog(
1620                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1621
1622         leader = new Leader(leaderActorContext);
1623         leaderActor.underlyingActor().setBehavior(leader);
1624         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1625
1626         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1627
1628         assertEquals(false, appendEntriesReply.isSuccess());
1629         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1630
1631         MessageCollectorActor.clearMessages(leaderActor);
1632     }
1633
1634     @Test
1635     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1636         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1637
1638         MockRaftActorContext leaderActorContext = createActorContext();
1639         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1640                 new FiniteDuration(10000, TimeUnit.SECONDS));
1641
1642         leaderActorContext.setReplicatedLog(
1643                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1644         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1645
1646         leader = new Leader(leaderActorContext);
1647         leaderActor.underlyingActor().setBehavior(leader);
1648         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1649
1650         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1651
1652         assertEquals(false, appendEntriesReply.isSuccess());
1653         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1654
1655         MessageCollectorActor.clearMessages(leaderActor);
1656     }
1657
1658     @Test
1659     public void testHandleAppendEntriesReplySuccess() throws Exception {
1660         logStart("testHandleAppendEntriesReplySuccess");
1661
1662         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1663
1664         leaderActorContext.setReplicatedLog(
1665                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1666
1667         leaderActorContext.setCommitIndex(1);
1668         leaderActorContext.setLastApplied(1);
1669         leaderActorContext.getTermInformation().update(1, "leader");
1670
1671         leader = new Leader(leaderActorContext);
1672
1673         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1674
1675         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1676         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1677
1678         short payloadVersion = 5;
1679         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1680
1681         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1682
1683         assertEquals(RaftState.Leader, raftActorBehavior.state());
1684
1685         assertEquals(2, leaderActorContext.getCommitIndex());
1686
1687         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1688                 leaderActor, ApplyJournalEntries.class);
1689
1690         assertEquals(2, leaderActorContext.getLastApplied());
1691
1692         assertEquals(2, applyJournalEntries.getToIndex());
1693
1694         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1695                 ApplyState.class);
1696
1697         assertEquals(1,applyStateList.size());
1698
1699         ApplyState applyState = applyStateList.get(0);
1700
1701         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1702
1703         assertEquals(2, followerInfo.getMatchIndex());
1704         assertEquals(3, followerInfo.getNextIndex());
1705         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1706         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1707     }
1708
1709     @Test
1710     public void testHandleAppendEntriesReplyUnknownFollower(){
1711         logStart("testHandleAppendEntriesReplyUnknownFollower");
1712
1713         MockRaftActorContext leaderActorContext = createActorContext();
1714
1715         leader = new Leader(leaderActorContext);
1716
1717         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1718
1719         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1720
1721         assertEquals(RaftState.Leader, raftActorBehavior.state());
1722     }
1723
1724     @Test
1725     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1726         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1727
1728         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1729         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1730                 new FiniteDuration(1000, TimeUnit.SECONDS));
1731         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1732
1733         leaderActorContext.setReplicatedLog(
1734                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1735         long leaderCommitIndex = 3;
1736         leaderActorContext.setCommitIndex(leaderCommitIndex);
1737         leaderActorContext.setLastApplied(leaderCommitIndex);
1738
1739         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1740         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1741         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1742         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1743
1744         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1745
1746         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1747         followerActorContext.setCommitIndex(-1);
1748         followerActorContext.setLastApplied(-1);
1749
1750         Follower follower = new Follower(followerActorContext);
1751         followerActor.underlyingActor().setBehavior(follower);
1752         followerActorContext.setCurrentBehavior(follower);
1753
1754         leader = new Leader(leaderActorContext);
1755
1756         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1757         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1758
1759         MessageCollectorActor.clearMessages(followerActor);
1760         MessageCollectorActor.clearMessages(leaderActor);
1761
1762         // Verify initial AppendEntries sent with the leader's current commit index.
1763         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1764         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1765         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1766
1767         leaderActor.underlyingActor().setBehavior(leader);
1768         leaderActorContext.setCurrentBehavior(leader);
1769
1770         leader.handleMessage(followerActor, appendEntriesReply);
1771
1772         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1773         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1774
1775         appendEntries = appendEntriesList.get(0);
1776         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1777         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1778         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1779
1780         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1781         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1782                 appendEntries.getEntries().get(0).getData());
1783         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1784         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1785                 appendEntries.getEntries().get(1).getData());
1786
1787         appendEntries = appendEntriesList.get(1);
1788         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1789         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1790         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1791
1792         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1793         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1794                 appendEntries.getEntries().get(0).getData());
1795         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1796         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1797                 appendEntries.getEntries().get(1).getData());
1798
1799         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1800         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1801
1802         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1803
1804         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1805         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1806     }
1807
1808     @Test
1809     public void testHandleRequestVoteReply(){
1810         logStart("testHandleRequestVoteReply");
1811
1812         MockRaftActorContext leaderActorContext = createActorContext();
1813
1814         leader = new Leader(leaderActorContext);
1815
1816         // Should be a no-op.
1817         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1818                 new RequestVoteReply(1, true));
1819
1820         assertEquals(RaftState.Leader, raftActorBehavior.state());
1821
1822         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1823
1824         assertEquals(RaftState.Leader, raftActorBehavior.state());
1825     }
1826
1827     @Test
1828     public void testIsolatedLeaderCheckNoFollowers() {
1829         logStart("testIsolatedLeaderCheckNoFollowers");
1830
1831         MockRaftActorContext leaderActorContext = createActorContext();
1832
1833         leader = new Leader(leaderActorContext);
1834         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1835         assertTrue(behavior instanceof Leader);
1836     }
1837
1838     @Test
1839     public void testIsolatedLeaderCheckNoVotingFollowers() {
1840         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1841
1842         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1843         Follower follower = new Follower(followerActorContext);
1844         followerActor.underlyingActor().setBehavior(follower);
1845
1846         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1847         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1848                 new FiniteDuration(1000, TimeUnit.SECONDS));
1849         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1850
1851         leader = new Leader(leaderActorContext);
1852         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1853         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1854         assertTrue("Expected Leader", behavior instanceof Leader);
1855     }
1856
1857     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1858         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1859         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1860
1861         MockRaftActorContext leaderActorContext = createActorContext();
1862
1863         Map<String, String> peerAddresses = new HashMap<>();
1864         peerAddresses.put("follower-1", followerActor1.path().toString());
1865         peerAddresses.put("follower-2", followerActor2.path().toString());
1866
1867         leaderActorContext.setPeerAddresses(peerAddresses);
1868         leaderActorContext.setRaftPolicy(raftPolicy);
1869
1870         leader = new Leader(leaderActorContext);
1871
1872         leader.markFollowerActive("follower-1");
1873         leader.markFollowerActive("follower-2");
1874         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1875         assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1876
1877         // kill 1 follower and verify if that got killed
1878         final JavaTestKit probe = new JavaTestKit(getSystem());
1879         probe.watch(followerActor1);
1880         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1881         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1882         assertEquals(termMsg1.getActor(), followerActor1);
1883
1884         leader.markFollowerInActive("follower-1");
1885         leader.markFollowerActive("follower-2");
1886         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1887         assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1888
1889         // kill 2nd follower and leader should change to Isolated leader
1890         followerActor2.tell(PoisonPill.getInstance(), null);
1891         probe.watch(followerActor2);
1892         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1893         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1894         assertEquals(termMsg2.getActor(), followerActor2);
1895
1896         leader.markFollowerInActive("follower-2");
1897         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1898     }
1899
1900     @Test
1901     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1902         logStart("testIsolatedLeaderCheckTwoFollowers");
1903
1904         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1905
1906         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1907             behavior instanceof IsolatedLeader);
1908     }
1909
1910     @Test
1911     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1912         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1913
1914         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1915
1916         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1917                 behavior instanceof Leader);
1918     }
1919
1920     @Test
1921     public void testLaggingFollowerStarvation() throws Exception {
1922         logStart("testLaggingFollowerStarvation");
1923         new JavaTestKit(getSystem()) {{
1924             String leaderActorId = actorFactory.generateActorId("leader");
1925             String follower1ActorId = actorFactory.generateActorId("follower");
1926             String follower2ActorId = actorFactory.generateActorId("follower");
1927
1928             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1929                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1930             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1931             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1932
1933             MockRaftActorContext leaderActorContext =
1934                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1935
1936             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1937             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1938             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1939
1940             leaderActorContext.setConfigParams(configParams);
1941
1942             leaderActorContext.setReplicatedLog(
1943                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1944
1945             Map<String, String> peerAddresses = new HashMap<>();
1946             peerAddresses.put(follower1ActorId,
1947                     follower1Actor.path().toString());
1948             peerAddresses.put(follower2ActorId,
1949                     follower2Actor.path().toString());
1950
1951             leaderActorContext.setPeerAddresses(peerAddresses);
1952             leaderActorContext.getTermInformation().update(1, leaderActorId);
1953
1954             RaftActorBehavior leader = createBehavior(leaderActorContext);
1955
1956             leaderActor.underlyingActor().setBehavior(leader);
1957
1958             for(int i=1;i<6;i++) {
1959                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1960                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1961                 assertTrue(newBehavior == leader);
1962                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1963             }
1964
1965             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1966             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1967
1968             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1969                     heartbeats.size() > 1);
1970
1971             // Check if follower-2 got AppendEntries during this time and was not starved
1972             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1973
1974             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1975                     appendEntries.size() > 1);
1976
1977         }};
1978     }
1979
1980     @Test
1981     public void testReplicationConsensusWithNonVotingFollower() {
1982         logStart("testReplicationConsensusWithNonVotingFollower");
1983
1984         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1985         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1986                 new FiniteDuration(1000, TimeUnit.SECONDS));
1987
1988         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1989         leaderActorContext.setCommitIndex(-1);
1990
1991         String nonVotingFollowerId = "nonvoting-follower";
1992         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1993                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1994
1995         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1996
1997         leader = new Leader(leaderActorContext);
1998         leaderActorContext.setCurrentBehavior(leader);
1999
2000         // Ignore initial heartbeats
2001         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2002         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2003
2004         MessageCollectorActor.clearMessages(followerActor);
2005         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2006         MessageCollectorActor.clearMessages(leaderActor);
2007
2008         // Send a Replicate message and wait for AppendEntries.
2009         sendReplicate(leaderActorContext, 0);
2010
2011         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2013
2014         // Send reply only from the voting follower and verify consensus via ApplyState.
2015         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2016
2017         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2018
2019         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2020
2021         MessageCollectorActor.clearMessages(followerActor);
2022         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2023         MessageCollectorActor.clearMessages(leaderActor);
2024
2025         // Send another Replicate message
2026         sendReplicate(leaderActorContext, 1);
2027
2028         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2029         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2030                 AppendEntries.class);
2031         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2032         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2033
2034         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2035         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2036
2037         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2038
2039         // Send reply from the voting follower and verify consensus.
2040         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2041
2042         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2043     }
2044
2045     @Test
2046     public void testTransferLeadershipWithFollowerInSync() {
2047         logStart("testTransferLeadershipWithFollowerInSync");
2048
2049         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2051                 new FiniteDuration(1000, TimeUnit.SECONDS));
2052         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2053
2054         leader = new Leader(leaderActorContext);
2055         leaderActorContext.setCurrentBehavior(leader);
2056
2057         // Initial heartbeat
2058         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2060         MessageCollectorActor.clearMessages(followerActor);
2061
2062         sendReplicate(leaderActorContext, 0);
2063         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2064
2065         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2066         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2067         MessageCollectorActor.clearMessages(followerActor);
2068
2069         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2070         leader.transferLeadership(mockTransferCohort);
2071
2072         verify(mockTransferCohort, never()).transferComplete();
2073         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2074         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2075
2076         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2077         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2078
2079         // Leader should force an election timeout
2080         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2081
2082         verify(mockTransferCohort).transferComplete();
2083     }
2084
2085     @Test
2086     public void testTransferLeadershipWithEmptyLog() {
2087         logStart("testTransferLeadershipWithEmptyLog");
2088
2089         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2090         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2091                 new FiniteDuration(1000, TimeUnit.SECONDS));
2092         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2093
2094         leader = new Leader(leaderActorContext);
2095         leaderActorContext.setCurrentBehavior(leader);
2096
2097         // Initial heartbeat
2098         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100         MessageCollectorActor.clearMessages(followerActor);
2101
2102         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2103         leader.transferLeadership(mockTransferCohort);
2104
2105         verify(mockTransferCohort, never()).transferComplete();
2106         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2107         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2108
2109         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2110         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2111
2112         // Leader should force an election timeout
2113         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2114
2115         verify(mockTransferCohort).transferComplete();
2116     }
2117
2118     @Test
2119     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2120         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2121
2122         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2123         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2124                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2125
2126         leader = new Leader(leaderActorContext);
2127         leaderActorContext.setCurrentBehavior(leader);
2128
2129         // Initial heartbeat
2130         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2131         MessageCollectorActor.clearMessages(followerActor);
2132
2133         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2134         leader.transferLeadership(mockTransferCohort);
2135
2136         verify(mockTransferCohort, never()).transferComplete();
2137
2138         // Sync up the follower.
2139         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2140         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2141         MessageCollectorActor.clearMessages(followerActor);
2142
2143         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2144                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2145         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2146         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2148
2149         // Leader should force an election timeout
2150         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2151
2152         verify(mockTransferCohort).transferComplete();
2153     }
2154
2155     @Test
2156     public void testTransferLeadershipWithFollowerSyncTimeout() {
2157         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2158
2159         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2162         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2163         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2164
2165         leader = new Leader(leaderActorContext);
2166         leaderActorContext.setCurrentBehavior(leader);
2167
2168         // Initial heartbeat
2169         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2170         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2171         MessageCollectorActor.clearMessages(followerActor);
2172
2173         sendReplicate(leaderActorContext, 0);
2174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2175
2176         MessageCollectorActor.clearMessages(followerActor);
2177
2178         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2179         leader.transferLeadership(mockTransferCohort);
2180
2181         verify(mockTransferCohort, never()).transferComplete();
2182
2183         // Send heartbeats to time out the transfer.
2184         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2185             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2186                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2187             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2188         }
2189
2190         verify(mockTransferCohort).abortTransfer();
2191         verify(mockTransferCohort, never()).transferComplete();
2192         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2193     }
2194
2195     @Override
2196     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2197             ActorRef actorRef, RaftRPC rpc) throws Exception {
2198         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2199         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2200     }
2201
2202     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2203
2204         private final long electionTimeOutIntervalMillis;
2205         private final int snapshotChunkSize;
2206
2207         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2208             super();
2209             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2210             this.snapshotChunkSize = snapshotChunkSize;
2211         }
2212
2213         @Override
2214         public FiniteDuration getElectionTimeOutInterval() {
2215             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2216         }
2217
2218         @Override
2219         public int getSnapshotChunkSize() {
2220             return snapshotChunkSize;
2221         }
2222     }
2223 }