Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.google.protobuf.ByteString;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
39 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
42 import org.opendaylight.controller.cluster.raft.RaftState;
43 import org.opendaylight.controller.cluster.raft.RaftVersions;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.yangtools.concepts.Identifier;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class LeaderTest extends AbstractLeaderTest<Leader> {
70
71     static final String FOLLOWER_ID = "follower";
72     public static final String LEADER_ID = "leader";
73
74     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
75             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
76
77     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
78             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
79
80     private Leader leader;
81     private final short payloadVersion = 5;
82
83     @Override
84     @After
85     public void tearDown() throws Exception {
86         if(leader != null) {
87             leader.close();
88         }
89
90         super.tearDown();
91     }
92
93     @Test
94     public void testHandleMessageForUnknownMessage() throws Exception {
95         logStart("testHandleMessageForUnknownMessage");
96
97         leader = new Leader(createActorContext());
98
99         // handle message should null when it receives an unknown message
100         assertNull(leader.handleMessage(followerActor, "foo"));
101     }
102
103     @Test
104     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106
107         MockRaftActorContext actorContext = createActorContextWithFollower();
108         actorContext.setCommitIndex(-1);
109         short payloadVersion = (short)5;
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116         actorContext.setCurrentBehavior(leader);
117
118         // Leader should send an immediate heartbeat with no entries as follower is inactive.
119         long lastIndex = actorContext.getReplicatedLog().lastIndex();
120         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getTerm", term, appendEntries.getTerm());
122         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 0, appendEntries.getEntries().size());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126
127         // The follower would normally reply - simulate that explicitly here.
128         leader.handleMessage(followerActor, new AppendEntriesReply(
129                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131
132         followerActor.underlyingActor().clear();
133
134         // Sleep for the heartbeat interval so AppendEntries is sent.
135         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
136                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137
138         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139
140         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143         assertEquals("Entries size", 1, appendEntries.getEntries().size());
144         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147     }
148
149
150     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
151         return sendReplicate(actorContext, 1, index);
152     }
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
155         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157                 term, index, payload);
158         actorContext.getReplicatedLog().append(newEntry);
159         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
160     }
161
162     @Test
163     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165
166         MockRaftActorContext actorContext = createActorContextWithFollower();
167
168         long term = 1;
169         actorContext.getTermInformation().update(term, "");
170
171         leader = new Leader(actorContext);
172
173         // Leader will send an immediate heartbeat - ignore it.
174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175
176         // The follower would normally reply - simulate that explicitly here.
177         long lastIndex = actorContext.getReplicatedLog().lastIndex();
178         leader.handleMessage(followerActor, new AppendEntriesReply(
179                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181
182         followerActor.underlyingActor().clear();
183
184         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185
186         // State should not change
187         assertTrue(raftBehavior instanceof Leader);
188
189         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192         assertEquals("Entries size", 1, appendEntries.getEntries().size());
193         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
197     }
198
199     @Test
200     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202
203         MockRaftActorContext actorContext = createActorContextWithFollower();
204         actorContext.setCommitIndex(-1);
205         actorContext.setLastApplied(-1);
206
207         // The raft context is initialized with a couple log entries. However the commitIndex
208         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
209         // committed and applied. Now it regains leadership with a higher term (2).
210         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
211         long newTerm = prevTerm + 1;
212         actorContext.getTermInformation().update(newTerm, "");
213
214         leader = new Leader(actorContext);
215         actorContext.setCurrentBehavior(leader);
216
217         // Leader will send an immediate heartbeat - ignore it.
218         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
219
220         // The follower replies with the leader's current last index and term, simulating that it is
221         // up to date with the leader.
222         long lastIndex = actorContext.getReplicatedLog().lastIndex();
223         leader.handleMessage(followerActor, new AppendEntriesReply(
224                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
225
226         // The commit index should not get updated even though consensus was reached. This is b/c the
227         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
228         // from previous terms by counting replicas".
229         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
230
231         followerActor.underlyingActor().clear();
232
233         // Now replicate a new entry with the new term 2.
234         long newIndex = lastIndex + 1;
235         sendReplicate(actorContext, newTerm, newIndex);
236
237         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
238         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
239         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
240         assertEquals("Entries size", 1, appendEntries.getEntries().size());
241         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
242         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
243         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
244
245         // The follower replies with success. The leader should now update the commit index to the new index
246         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
247         // prior entries are committed indirectly".
248         leader.handleMessage(followerActor, new AppendEntriesReply(
249                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
250
251         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
252     }
253
254     @Test
255     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
256         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
257
258         MockRaftActorContext actorContext = createActorContextWithFollower();
259         actorContext.setRaftPolicy(createRaftPolicy(true, true));
260
261         long term = 1;
262         actorContext.getTermInformation().update(term, "");
263
264         leader = new Leader(actorContext);
265
266         // Leader will send an immediate heartbeat - ignore it.
267         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
268
269         // The follower would normally reply - simulate that explicitly here.
270         long lastIndex = actorContext.getReplicatedLog().lastIndex();
271         leader.handleMessage(followerActor, new AppendEntriesReply(
272                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
273         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
274
275         followerActor.underlyingActor().clear();
276
277         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
278
279         // State should not change
280         assertTrue(raftBehavior instanceof Leader);
281
282         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
284         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
285         assertEquals("Entries size", 1, appendEntries.getEntries().size());
286         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
287         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
288         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
289         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
290     }
291
292     @Test
293     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
294         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
295
296         MockRaftActorContext actorContext = createActorContextWithFollower();
297         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
298             @Override
299             public FiniteDuration getHeartBeatInterval() {
300                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
301             }
302         });
303
304         long term = 1;
305         actorContext.getTermInformation().update(term, "");
306
307         leader = new Leader(actorContext);
308
309         // Leader will send an immediate heartbeat - ignore it.
310         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
311
312         // The follower would normally reply - simulate that explicitly here.
313         long lastIndex = actorContext.getReplicatedLog().lastIndex();
314         leader.handleMessage(followerActor, new AppendEntriesReply(
315                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
316         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
317
318         followerActor.underlyingActor().clear();
319
320         for(int i=0;i<5;i++) {
321             sendReplicate(actorContext, lastIndex+i+1);
322         }
323
324         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
325         // We expect only 1 message to be sent because of two reasons,
326         // - an append entries reply was not received
327         // - the heartbeat interval has not expired
328         // In this scenario if multiple messages are sent they would likely be duplicates
329         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
330     }
331
332     @Test
333     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
334         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
335
336         MockRaftActorContext actorContext = createActorContextWithFollower();
337         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
338             @Override
339             public FiniteDuration getHeartBeatInterval() {
340                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
341             }
342         });
343
344         long term = 1;
345         actorContext.getTermInformation().update(term, "");
346
347         leader = new Leader(actorContext);
348
349         // Leader will send an immediate heartbeat - ignore it.
350         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
351
352         // The follower would normally reply - simulate that explicitly here.
353         long lastIndex = actorContext.getReplicatedLog().lastIndex();
354         leader.handleMessage(followerActor, new AppendEntriesReply(
355                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
356         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
357
358         followerActor.underlyingActor().clear();
359
360         for(int i=0;i<3;i++) {
361             sendReplicate(actorContext, lastIndex+i+1);
362             leader.handleMessage(followerActor, new AppendEntriesReply(
363                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
364
365         }
366
367         for(int i=3;i<5;i++) {
368             sendReplicate(actorContext, lastIndex + i + 1);
369         }
370
371         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
372         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
373         // get sent to the follower - but not the 5th
374         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
375
376         for(int i=0;i<4;i++) {
377             long expected = allMessages.get(i).getEntries().get(0).getIndex();
378             assertEquals(expected, i+2);
379         }
380     }
381
382     @Test
383     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
384         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
385
386         MockRaftActorContext actorContext = createActorContextWithFollower();
387         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
388             @Override
389             public FiniteDuration getHeartBeatInterval() {
390                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
391             }
392         });
393
394         long term = 1;
395         actorContext.getTermInformation().update(term, "");
396
397         leader = new Leader(actorContext);
398
399         // Leader will send an immediate heartbeat - ignore it.
400         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
401
402         // The follower would normally reply - simulate that explicitly here.
403         long lastIndex = actorContext.getReplicatedLog().lastIndex();
404         leader.handleMessage(followerActor, new AppendEntriesReply(
405                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
406         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
407
408         followerActor.underlyingActor().clear();
409
410         sendReplicate(actorContext, lastIndex+1);
411
412         // Wait slightly longer than heartbeat duration
413         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
414
415         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
416
417         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
418         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
419
420         assertEquals(1, allMessages.get(0).getEntries().size());
421         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
422         assertEquals(1, allMessages.get(1).getEntries().size());
423         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
424
425     }
426
427     @Test
428     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
429         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
430
431         MockRaftActorContext actorContext = createActorContextWithFollower();
432         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
433             @Override
434             public FiniteDuration getHeartBeatInterval() {
435                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
436             }
437         });
438
439         long term = 1;
440         actorContext.getTermInformation().update(term, "");
441
442         leader = new Leader(actorContext);
443
444         // Leader will send an immediate heartbeat - ignore it.
445         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
446
447         // The follower would normally reply - simulate that explicitly here.
448         long lastIndex = actorContext.getReplicatedLog().lastIndex();
449         leader.handleMessage(followerActor, new AppendEntriesReply(
450                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
451         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
452
453         followerActor.underlyingActor().clear();
454
455         for(int i=0;i<3;i++) {
456             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
457             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
458         }
459
460         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
461         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
462     }
463
464     @Test
465     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
466         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
467
468         MockRaftActorContext actorContext = createActorContextWithFollower();
469         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470             @Override
471             public FiniteDuration getHeartBeatInterval() {
472                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
473             }
474         });
475
476         long term = 1;
477         actorContext.getTermInformation().update(term, "");
478
479         leader = new Leader(actorContext);
480
481         // Leader will send an immediate heartbeat - ignore it.
482         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483
484         // The follower would normally reply - simulate that explicitly here.
485         long lastIndex = actorContext.getReplicatedLog().lastIndex();
486         leader.handleMessage(followerActor, new AppendEntriesReply(
487                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489
490         followerActor.underlyingActor().clear();
491
492         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
493         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
494         sendReplicate(actorContext, lastIndex+1);
495
496         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
497         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
498
499         assertEquals(0, allMessages.get(0).getEntries().size());
500         assertEquals(1, allMessages.get(1).getEntries().size());
501     }
502
503
504     @Test
505     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
506         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
507
508         MockRaftActorContext actorContext = createActorContext();
509
510         leader = new Leader(actorContext);
511
512         actorContext.setLastApplied(0);
513
514         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
515         long term = actorContext.getTermInformation().getCurrentTerm();
516         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
517                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
518
519         actorContext.getReplicatedLog().append(newEntry);
520
521         final Identifier id = new MockIdentifier("state-id");
522         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
523
524         // State should not change
525         assertTrue(raftBehavior instanceof Leader);
526
527         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
528
529         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
530         // one since lastApplied state is 0.
531         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
532                 leaderActor, ApplyState.class);
533         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
534
535         for(int i = 0; i <= newLogIndex - 1; i++ ) {
536             ApplyState applyState = applyStateList.get(i);
537             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
538             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
539         }
540
541         ApplyState last = applyStateList.get((int) newLogIndex - 1);
542         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
543         assertEquals("getIdentifier", id, last.getIdentifier());
544     }
545
546     @Test
547     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
548         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
549
550         MockRaftActorContext actorContext = createActorContextWithFollower();
551
552         Map<String, String> leadersSnapshot = new HashMap<>();
553         leadersSnapshot.put("1", "A");
554         leadersSnapshot.put("2", "B");
555         leadersSnapshot.put("3", "C");
556
557         //clears leaders log
558         actorContext.getReplicatedLog().removeFrom(0);
559
560         final int commitIndex = 3;
561         final int snapshotIndex = 2;
562         final int newEntryIndex = 4;
563         final int snapshotTerm = 1;
564         final int currentTerm = 2;
565
566         // set the snapshot variables in replicatedlog
567         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569         actorContext.setCommitIndex(commitIndex);
570         //set follower timeout to 2 mins, helps during debugging
571         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
572
573         leader = new Leader(actorContext);
574
575         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
576         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
577
578         // new entry
579         ReplicatedLogImplEntry entry =
580                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
581                         new MockRaftActorContext.MockPayload("D"));
582
583         //update follower timestamp
584         leader.markFollowerActive(FOLLOWER_ID);
585
586         ByteString bs = toByteString(leadersSnapshot);
587         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
588                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
589         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
590                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
591         fts.setSnapshotBytes(bs);
592         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
593
594         //send first chunk and no InstallSnapshotReply received yet
595         fts.getNextChunk();
596         fts.incrementChunkIndex();
597
598         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
599                 TimeUnit.MILLISECONDS);
600
601         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
602
603         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
604
605         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
606
607         //InstallSnapshotReply received
608         fts.markSendStatus(true);
609
610         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
611
612         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
613
614         assertEquals(commitIndex, is.getLastIncludedIndex());
615     }
616
617     @Test
618     public void testSendAppendEntriesSnapshotScenario() throws Exception {
619         logStart("testSendAppendEntriesSnapshotScenario");
620
621         MockRaftActorContext actorContext = createActorContextWithFollower();
622
623         Map<String, String> leadersSnapshot = new HashMap<>();
624         leadersSnapshot.put("1", "A");
625         leadersSnapshot.put("2", "B");
626         leadersSnapshot.put("3", "C");
627
628         //clears leaders log
629         actorContext.getReplicatedLog().removeFrom(0);
630
631         final int followersLastIndex = 2;
632         final int snapshotIndex = 3;
633         final int newEntryIndex = 4;
634         final int snapshotTerm = 1;
635         final int currentTerm = 2;
636
637         // set the snapshot variables in replicatedlog
638         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640         actorContext.setCommitIndex(followersLastIndex);
641
642         leader = new Leader(actorContext);
643
644         // Leader will send an immediate heartbeat - ignore it.
645         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
646
647         // new entry
648         ReplicatedLogImplEntry entry =
649                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
650                         new MockRaftActorContext.MockPayload("D"));
651
652         actorContext.getReplicatedLog().append(entry);
653
654         //update follower timestamp
655         leader.markFollowerActive(FOLLOWER_ID);
656
657         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
658         RaftActorBehavior raftBehavior = leader.handleMessage(
659                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
660
661         assertTrue(raftBehavior instanceof Leader);
662
663         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
664     }
665
666     @Test
667     public void testInitiateInstallSnapshot() throws Exception {
668         logStart("testInitiateInstallSnapshot");
669
670         MockRaftActorContext actorContext = createActorContextWithFollower();
671
672         //clears leaders log
673         actorContext.getReplicatedLog().removeFrom(0);
674
675         final int followersLastIndex = 2;
676         final int snapshotIndex = 3;
677         final int newEntryIndex = 4;
678         final int snapshotTerm = 1;
679         final int currentTerm = 2;
680
681         // set the snapshot variables in replicatedlog
682         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
683         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
684         actorContext.setLastApplied(3);
685         actorContext.setCommitIndex(followersLastIndex);
686
687         leader = new Leader(actorContext);
688
689         // Leader will send an immediate heartbeat - ignore it.
690         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
691
692         // set the snapshot as absent and check if capture-snapshot is invoked.
693         leader.setSnapshot(null);
694
695         // new entry
696         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
697                 new MockRaftActorContext.MockPayload("D"));
698
699         actorContext.getReplicatedLog().append(entry);
700
701         //update follower timestamp
702         leader.markFollowerActive(FOLLOWER_ID);
703
704         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
705
706         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
707
708         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
709
710         assertTrue(cs.isInstallSnapshotInitiated());
711         assertEquals(3, cs.getLastAppliedIndex());
712         assertEquals(1, cs.getLastAppliedTerm());
713         assertEquals(4, cs.getLastIndex());
714         assertEquals(2, cs.getLastTerm());
715
716         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
717         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
718
719         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
720     }
721
722     @Test
723     public void testInitiateForceInstallSnapshot() throws Exception {
724         logStart("testInitiateForceInstallSnapshot");
725
726         MockRaftActorContext actorContext = createActorContextWithFollower();
727
728         final int followersLastIndex = 2;
729         final int snapshotIndex = -1;
730         final int newEntryIndex = 4;
731         final int snapshotTerm = -1;
732         final int currentTerm = 2;
733
734         // set the snapshot variables in replicatedlog
735         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
736         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
737         actorContext.setLastApplied(3);
738         actorContext.setCommitIndex(followersLastIndex);
739
740         actorContext.getReplicatedLog().removeFrom(0);
741
742         leader = new Leader(actorContext);
743         actorContext.setCurrentBehavior(leader);
744
745         // Leader will send an immediate heartbeat - ignore it.
746         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
747
748         // set the snapshot as absent and check if capture-snapshot is invoked.
749         leader.setSnapshot(null);
750
751         for(int i=0;i<4;i++) {
752             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
753                     new MockRaftActorContext.MockPayload("X" + i)));
754         }
755
756         // new entry
757         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
758                 new MockRaftActorContext.MockPayload("D"));
759
760         actorContext.getReplicatedLog().append(entry);
761
762         //update follower timestamp
763         leader.markFollowerActive(FOLLOWER_ID);
764
765         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
766         // installed with a SendInstallSnapshot
767         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
768
769         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
770
771         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
772
773         assertTrue(cs.isInstallSnapshotInitiated());
774         assertEquals(3, cs.getLastAppliedIndex());
775         assertEquals(1, cs.getLastAppliedTerm());
776         assertEquals(4, cs.getLastIndex());
777         assertEquals(2, cs.getLastTerm());
778
779         // if an initiate is started again when first is in progress, it should not initiate Capture
780         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
781
782         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
783     }
784
785
786     @Test
787     public void testInstallSnapshot() throws Exception {
788         logStart("testInstallSnapshot");
789
790         MockRaftActorContext actorContext = createActorContextWithFollower();
791
792         Map<String, String> leadersSnapshot = new HashMap<>();
793         leadersSnapshot.put("1", "A");
794         leadersSnapshot.put("2", "B");
795         leadersSnapshot.put("3", "C");
796
797         //clears leaders log
798         actorContext.getReplicatedLog().removeFrom(0);
799
800         final int lastAppliedIndex = 3;
801         final int snapshotIndex = 2;
802         final int snapshotTerm = 1;
803         final int currentTerm = 2;
804
805         // set the snapshot variables in replicatedlog
806         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
807         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
808         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
809         actorContext.setCommitIndex(lastAppliedIndex);
810         actorContext.setLastApplied(lastAppliedIndex);
811
812         leader = new Leader(actorContext);
813
814         // Initial heartbeat.
815         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
816
817         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
818         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
819
820         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
821                 Collections.<ReplicatedLogEntry>emptyList(),
822                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
823
824         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
825
826         assertTrue(raftBehavior instanceof Leader);
827
828         // check if installsnapshot gets called with the correct values.
829
830         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
831
832         assertNotNull(installSnapshot.getData());
833         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
834         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
835
836         assertEquals(currentTerm, installSnapshot.getTerm());
837     }
838
839     @Test
840     public void testForceInstallSnapshot() throws Exception {
841         logStart("testForceInstallSnapshot");
842
843         MockRaftActorContext actorContext = createActorContextWithFollower();
844
845         Map<String, String> leadersSnapshot = new HashMap<>();
846         leadersSnapshot.put("1", "A");
847         leadersSnapshot.put("2", "B");
848         leadersSnapshot.put("3", "C");
849
850         final int lastAppliedIndex = 3;
851         final int snapshotIndex = -1;
852         final int snapshotTerm = -1;
853         final int currentTerm = 2;
854
855         // set the snapshot variables in replicatedlog
856         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
857         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
858         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
859         actorContext.setCommitIndex(lastAppliedIndex);
860         actorContext.setLastApplied(lastAppliedIndex);
861
862         leader = new Leader(actorContext);
863
864         // Initial heartbeat.
865         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
866
867         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
868         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
869
870         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
871                 Collections.<ReplicatedLogEntry>emptyList(),
872                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
873
874         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
875
876         assertTrue(raftBehavior instanceof Leader);
877
878         // check if installsnapshot gets called with the correct values.
879
880         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
881
882         assertNotNull(installSnapshot.getData());
883         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
884         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
885
886         assertEquals(currentTerm, installSnapshot.getTerm());
887     }
888
889     @Test
890     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
891         logStart("testHandleInstallSnapshotReplyLastChunk");
892
893         MockRaftActorContext actorContext = createActorContextWithFollower();
894
895         final int commitIndex = 3;
896         final int snapshotIndex = 2;
897         final int snapshotTerm = 1;
898         final int currentTerm = 2;
899
900         actorContext.setCommitIndex(commitIndex);
901
902         leader = new Leader(actorContext);
903         actorContext.setCurrentBehavior(leader);
904
905         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
906         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
907
908         // Ignore initial heartbeat.
909         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
910
911         Map<String, String> leadersSnapshot = new HashMap<>();
912         leadersSnapshot.put("1", "A");
913         leadersSnapshot.put("2", "B");
914         leadersSnapshot.put("3", "C");
915
916         // set the snapshot variables in replicatedlog
917
918         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
919         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
920         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
921
922         ByteString bs = toByteString(leadersSnapshot);
923         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
924                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
925         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
926                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
927         fts.setSnapshotBytes(bs);
928         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
929         while(!fts.isLastChunk(fts.getChunkIndex())) {
930             fts.getNextChunk();
931             fts.incrementChunkIndex();
932         }
933
934         //clears leaders log
935         actorContext.getReplicatedLog().removeFrom(0);
936
937         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
938                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
939
940         assertTrue(raftBehavior instanceof Leader);
941
942         assertEquals(1, leader.followerLogSize());
943         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
944         assertNotNull(fli);
945         assertNull(fli.getInstallSnapshotState());
946         assertEquals(commitIndex, fli.getMatchIndex());
947         assertEquals(commitIndex + 1, fli.getNextIndex());
948         assertFalse(leader.hasSnapshot());
949     }
950
951     @Test
952     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
953         logStart("testSendSnapshotfromInstallSnapshotReply");
954
955         MockRaftActorContext actorContext = createActorContextWithFollower();
956
957         final int commitIndex = 3;
958         final int snapshotIndex = 2;
959         final int snapshotTerm = 1;
960         final int currentTerm = 2;
961
962         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
963             @Override
964             public int getSnapshotChunkSize() {
965                 return 50;
966             }
967         };
968         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
969         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
970
971         actorContext.setConfigParams(configParams);
972         actorContext.setCommitIndex(commitIndex);
973
974         leader = new Leader(actorContext);
975         actorContext.setCurrentBehavior(leader);
976
977         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
978         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
979
980         Map<String, String> leadersSnapshot = new HashMap<>();
981         leadersSnapshot.put("1", "A");
982         leadersSnapshot.put("2", "B");
983         leadersSnapshot.put("3", "C");
984
985         // set the snapshot variables in replicatedlog
986         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
987         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
988         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
989
990         ByteString bs = toByteString(leadersSnapshot);
991         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
992                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
993         leader.setSnapshot(snapshot);
994
995         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
996
997         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
998
999         assertEquals(1, installSnapshot.getChunkIndex());
1000         assertEquals(3, installSnapshot.getTotalChunks());
1001
1002         followerActor.underlyingActor().clear();
1003         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1004                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1005
1006         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1007
1008         assertEquals(2, installSnapshot.getChunkIndex());
1009         assertEquals(3, installSnapshot.getTotalChunks());
1010
1011         followerActor.underlyingActor().clear();
1012         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1013                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1014
1015         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1016
1017         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1018         followerActor.underlyingActor().clear();
1019         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1020                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1021
1022         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1023
1024         assertNull(installSnapshot);
1025     }
1026
1027
1028     @Test
1029     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1030         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1031
1032         MockRaftActorContext actorContext = createActorContextWithFollower();
1033
1034         final int commitIndex = 3;
1035         final int snapshotIndex = 2;
1036         final int snapshotTerm = 1;
1037         final int currentTerm = 2;
1038
1039         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1040             @Override
1041             public int getSnapshotChunkSize() {
1042                 return 50;
1043             }
1044         });
1045
1046         actorContext.setCommitIndex(commitIndex);
1047
1048         leader = new Leader(actorContext);
1049
1050         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1051         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1052
1053         Map<String, String> leadersSnapshot = new HashMap<>();
1054         leadersSnapshot.put("1", "A");
1055         leadersSnapshot.put("2", "B");
1056         leadersSnapshot.put("3", "C");
1057
1058         // set the snapshot variables in replicatedlog
1059         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1060         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1061         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1062
1063         ByteString bs = toByteString(leadersSnapshot);
1064         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1065                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1066         leader.setSnapshot(snapshot);
1067
1068         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1069         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1070
1071         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1072
1073         assertEquals(1, installSnapshot.getChunkIndex());
1074         assertEquals(3, installSnapshot.getTotalChunks());
1075
1076         followerActor.underlyingActor().clear();
1077
1078         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1079                 FOLLOWER_ID, -1, false));
1080
1081         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1082                 TimeUnit.MILLISECONDS);
1083
1084         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1085
1086         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1087
1088         assertEquals(1, installSnapshot.getChunkIndex());
1089         assertEquals(3, installSnapshot.getTotalChunks());
1090     }
1091
1092     @Test
1093     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1094         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1095
1096         MockRaftActorContext actorContext = createActorContextWithFollower();
1097
1098         final int commitIndex = 3;
1099         final int snapshotIndex = 2;
1100         final int snapshotTerm = 1;
1101         final int currentTerm = 2;
1102
1103         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1104             @Override
1105             public int getSnapshotChunkSize() {
1106                 return 50;
1107             }
1108         });
1109
1110         actorContext.setCommitIndex(commitIndex);
1111
1112         leader = new Leader(actorContext);
1113
1114         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1115         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1116
1117         Map<String, String> leadersSnapshot = new HashMap<>();
1118         leadersSnapshot.put("1", "A");
1119         leadersSnapshot.put("2", "B");
1120         leadersSnapshot.put("3", "C");
1121
1122         // set the snapshot variables in replicatedlog
1123         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1124         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1125         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1126
1127         ByteString bs = toByteString(leadersSnapshot);
1128         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1129                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1130         leader.setSnapshot(snapshot);
1131
1132         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1133
1134         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1135
1136         assertEquals(1, installSnapshot.getChunkIndex());
1137         assertEquals(3, installSnapshot.getTotalChunks());
1138         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1139                 installSnapshot.getLastChunkHashCode().get().intValue());
1140
1141         int hashCode = Arrays.hashCode(installSnapshot.getData());
1142
1143         followerActor.underlyingActor().clear();
1144
1145         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1146                 FOLLOWER_ID, 1, true));
1147
1148         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1149
1150         assertEquals(2, installSnapshot.getChunkIndex());
1151         assertEquals(3, installSnapshot.getTotalChunks());
1152         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1153     }
1154
1155     @Test
1156     public void testLeaderInstallSnapshotState() {
1157         logStart("testLeaderInstallSnapshotState");
1158
1159         Map<String, String> leadersSnapshot = new HashMap<>();
1160         leadersSnapshot.put("1", "A");
1161         leadersSnapshot.put("2", "B");
1162         leadersSnapshot.put("3", "C");
1163
1164         ByteString bs = toByteString(leadersSnapshot);
1165         byte[] barray = bs.toByteArray();
1166
1167         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1168         fts.setSnapshotBytes(bs);
1169
1170         assertEquals(bs.size(), barray.length);
1171
1172         int chunkIndex=0;
1173         for (int i=0; i < barray.length; i = i + 50) {
1174             int j = i + 50;
1175             chunkIndex++;
1176
1177             if (i + 50 > barray.length) {
1178                 j = barray.length;
1179             }
1180
1181             byte[] chunk = fts.getNextChunk();
1182             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1183             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1184
1185             fts.markSendStatus(true);
1186             if (!fts.isLastChunk(chunkIndex)) {
1187                 fts.incrementChunkIndex();
1188             }
1189         }
1190
1191         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1192     }
1193
1194     @Override
1195     protected Leader createBehavior(final RaftActorContext actorContext) {
1196         return new Leader(actorContext);
1197     }
1198
1199     @Override
1200     protected MockRaftActorContext createActorContext() {
1201         return createActorContext(leaderActor);
1202     }
1203
1204     @Override
1205     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1206         return createActorContext(LEADER_ID, actorRef);
1207     }
1208
1209     private MockRaftActorContext createActorContextWithFollower() {
1210         MockRaftActorContext actorContext = createActorContext();
1211         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1212                 followerActor.path().toString()).build());
1213         return actorContext;
1214     }
1215
1216     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1217         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1218         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1219         configParams.setElectionTimeoutFactor(100000);
1220         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1221         context.setConfigParams(configParams);
1222         context.setPayloadVersion(payloadVersion);
1223         return context;
1224     }
1225
1226     private MockRaftActorContext createFollowerActorContextWithLeader() {
1227         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1228         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1229         followerConfig.setElectionTimeoutFactor(10000);
1230         followerActorContext.setConfigParams(followerConfig);
1231         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1232         return followerActorContext;
1233     }
1234
1235     @Test
1236     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1237         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1238
1239         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1240
1241         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1242
1243         Follower follower = new Follower(followerActorContext);
1244         followerActor.underlyingActor().setBehavior(follower);
1245         followerActorContext.setCurrentBehavior(follower);
1246
1247         Map<String, String> peerAddresses = new HashMap<>();
1248         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1249
1250         leaderActorContext.setPeerAddresses(peerAddresses);
1251
1252         leaderActorContext.getReplicatedLog().removeFrom(0);
1253
1254         //create 3 entries
1255         leaderActorContext.setReplicatedLog(
1256                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1257
1258         leaderActorContext.setCommitIndex(1);
1259
1260         followerActorContext.getReplicatedLog().removeFrom(0);
1261
1262         // follower too has the exact same log entries and has the same commit index
1263         followerActorContext.setReplicatedLog(
1264                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1265
1266         followerActorContext.setCommitIndex(1);
1267
1268         leader = new Leader(leaderActorContext);
1269         leaderActorContext.setCurrentBehavior(leader);
1270
1271         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1272
1273         assertEquals(-1, appendEntries.getLeaderCommit());
1274         assertEquals(0, appendEntries.getEntries().size());
1275         assertEquals(0, appendEntries.getPrevLogIndex());
1276
1277         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1278                 leaderActor, AppendEntriesReply.class);
1279
1280         assertEquals(2, appendEntriesReply.getLogLastIndex());
1281         assertEquals(1, appendEntriesReply.getLogLastTerm());
1282
1283         // follower returns its next index
1284         assertEquals(2, appendEntriesReply.getLogLastIndex());
1285         assertEquals(1, appendEntriesReply.getLogLastTerm());
1286
1287         follower.close();
1288     }
1289
1290     @Test
1291     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1292         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1293
1294         MockRaftActorContext leaderActorContext = createActorContext();
1295
1296         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1297         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1298
1299         Follower follower = new Follower(followerActorContext);
1300         followerActor.underlyingActor().setBehavior(follower);
1301         followerActorContext.setCurrentBehavior(follower);
1302
1303         Map<String, String> leaderPeerAddresses = new HashMap<>();
1304         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1305
1306         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1307
1308         leaderActorContext.getReplicatedLog().removeFrom(0);
1309
1310         leaderActorContext.setReplicatedLog(
1311                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1312
1313         leaderActorContext.setCommitIndex(1);
1314
1315         followerActorContext.getReplicatedLog().removeFrom(0);
1316
1317         followerActorContext.setReplicatedLog(
1318                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1319
1320         // follower has the same log entries but its commit index > leaders commit index
1321         followerActorContext.setCommitIndex(2);
1322
1323         leader = new Leader(leaderActorContext);
1324
1325         // Initial heartbeat
1326         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1327
1328         assertEquals(-1, appendEntries.getLeaderCommit());
1329         assertEquals(0, appendEntries.getEntries().size());
1330         assertEquals(0, appendEntries.getPrevLogIndex());
1331
1332         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1333                 leaderActor, AppendEntriesReply.class);
1334
1335         assertEquals(2, appendEntriesReply.getLogLastIndex());
1336         assertEquals(1, appendEntriesReply.getLogLastTerm());
1337
1338         leaderActor.underlyingActor().setBehavior(follower);
1339         leader.handleMessage(followerActor, appendEntriesReply);
1340
1341         leaderActor.underlyingActor().clear();
1342         followerActor.underlyingActor().clear();
1343
1344         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1345                 TimeUnit.MILLISECONDS);
1346
1347         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1348
1349         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1350
1351         assertEquals(2, appendEntries.getLeaderCommit());
1352         assertEquals(0, appendEntries.getEntries().size());
1353         assertEquals(2, appendEntries.getPrevLogIndex());
1354
1355         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1356
1357         assertEquals(2, appendEntriesReply.getLogLastIndex());
1358         assertEquals(1, appendEntriesReply.getLogLastTerm());
1359
1360         assertEquals(2, followerActorContext.getCommitIndex());
1361
1362         follower.close();
1363     }
1364
1365     @Test
1366     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1367         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1368
1369         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1370         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1371                 new FiniteDuration(1000, TimeUnit.SECONDS));
1372
1373         leaderActorContext.setReplicatedLog(
1374                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1375         long leaderCommitIndex = 2;
1376         leaderActorContext.setCommitIndex(leaderCommitIndex);
1377         leaderActorContext.setLastApplied(leaderCommitIndex);
1378
1379         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1380         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1381
1382         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1383
1384         followerActorContext.setReplicatedLog(
1385                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1386         followerActorContext.setCommitIndex(0);
1387         followerActorContext.setLastApplied(0);
1388
1389         Follower follower = new Follower(followerActorContext);
1390         followerActor.underlyingActor().setBehavior(follower);
1391
1392         leader = new Leader(leaderActorContext);
1393
1394         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1395         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1396
1397         MessageCollectorActor.clearMessages(followerActor);
1398         MessageCollectorActor.clearMessages(leaderActor);
1399
1400         // Verify initial AppendEntries sent.
1401         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1402         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1403         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1404
1405         leaderActor.underlyingActor().setBehavior(leader);
1406
1407         leader.handleMessage(followerActor, appendEntriesReply);
1408
1409         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1410         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1411
1412         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1413         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1414         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1415
1416         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1417         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1418                 appendEntries.getEntries().get(0).getData());
1419         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1420         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1421                 appendEntries.getEntries().get(1).getData());
1422
1423         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1424         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1425
1426         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1427
1428         ApplyState applyState = applyStateList.get(0);
1429         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1430         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1431         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1432                 applyState.getReplicatedLogEntry().getData());
1433
1434         applyState = applyStateList.get(1);
1435         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1436         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1437         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1438                 applyState.getReplicatedLogEntry().getData());
1439
1440         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1441         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1442     }
1443
1444     @Test
1445     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1446         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1447
1448         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1449         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1450                 new FiniteDuration(1000, TimeUnit.SECONDS));
1451
1452         leaderActorContext.setReplicatedLog(
1453                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1454         long leaderCommitIndex = 1;
1455         leaderActorContext.setCommitIndex(leaderCommitIndex);
1456         leaderActorContext.setLastApplied(leaderCommitIndex);
1457
1458         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1459         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1460
1461         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1462
1463         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1464         followerActorContext.setCommitIndex(-1);
1465         followerActorContext.setLastApplied(-1);
1466
1467         Follower follower = new Follower(followerActorContext);
1468         followerActor.underlyingActor().setBehavior(follower);
1469         followerActorContext.setCurrentBehavior(follower);
1470
1471         leader = new Leader(leaderActorContext);
1472
1473         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1474         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1475
1476         MessageCollectorActor.clearMessages(followerActor);
1477         MessageCollectorActor.clearMessages(leaderActor);
1478
1479         // Verify initial AppendEntries sent with the leader's current commit index.
1480         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1481         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1482         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1483
1484         leaderActor.underlyingActor().setBehavior(leader);
1485         leaderActorContext.setCurrentBehavior(leader);
1486
1487         leader.handleMessage(followerActor, appendEntriesReply);
1488
1489         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1490         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1491
1492         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1493         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1494         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1495
1496         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1497         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1498                 appendEntries.getEntries().get(0).getData());
1499         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1500         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1501                 appendEntries.getEntries().get(1).getData());
1502
1503         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1504         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1505
1506         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1507
1508         ApplyState applyState = applyStateList.get(0);
1509         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1510         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1511         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1512                 applyState.getReplicatedLogEntry().getData());
1513
1514         applyState = applyStateList.get(1);
1515         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1516         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1518                 applyState.getReplicatedLogEntry().getData());
1519
1520         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1521         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1522     }
1523
1524     @Test
1525     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1526         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1527
1528         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1529         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1530                 new FiniteDuration(1000, TimeUnit.SECONDS));
1531
1532         leaderActorContext.setReplicatedLog(
1533                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1534         long leaderCommitIndex = 1;
1535         leaderActorContext.setCommitIndex(leaderCommitIndex);
1536         leaderActorContext.setLastApplied(leaderCommitIndex);
1537
1538         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1539         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1540
1541         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1542
1543         followerActorContext.setReplicatedLog(
1544                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1545         followerActorContext.setCommitIndex(-1);
1546         followerActorContext.setLastApplied(-1);
1547
1548         Follower follower = new Follower(followerActorContext);
1549         followerActor.underlyingActor().setBehavior(follower);
1550         followerActorContext.setCurrentBehavior(follower);
1551
1552         leader = new Leader(leaderActorContext);
1553
1554         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1555         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1556
1557         MessageCollectorActor.clearMessages(followerActor);
1558         MessageCollectorActor.clearMessages(leaderActor);
1559
1560         // Verify initial AppendEntries sent with the leader's current commit index.
1561         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1562         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1563         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1564
1565         leaderActor.underlyingActor().setBehavior(leader);
1566         leaderActorContext.setCurrentBehavior(leader);
1567
1568         leader.handleMessage(followerActor, appendEntriesReply);
1569
1570         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1571         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1572
1573         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1574         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1575         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1576
1577         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1578         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1579         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1580                 appendEntries.getEntries().get(0).getData());
1581         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1582         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1583         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1584                 appendEntries.getEntries().get(1).getData());
1585
1586         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1587         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1588
1589         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1590
1591         ApplyState applyState = applyStateList.get(0);
1592         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1593         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1594         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1595                 applyState.getReplicatedLogEntry().getData());
1596
1597         applyState = applyStateList.get(1);
1598         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1599         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1600         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1601                 applyState.getReplicatedLogEntry().getData());
1602
1603         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1604         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1605         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1606     }
1607
1608     @Test
1609     public void testHandleAppendEntriesReplyWithNewerTerm(){
1610         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1611
1612         MockRaftActorContext leaderActorContext = createActorContext();
1613         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1614                 new FiniteDuration(10000, TimeUnit.SECONDS));
1615
1616         leaderActorContext.setReplicatedLog(
1617                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1618
1619         leader = new Leader(leaderActorContext);
1620         leaderActor.underlyingActor().setBehavior(leader);
1621         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1622
1623         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1624
1625         assertEquals(false, appendEntriesReply.isSuccess());
1626         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1627
1628         MessageCollectorActor.clearMessages(leaderActor);
1629     }
1630
1631     @Test
1632     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1633         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1634
1635         MockRaftActorContext leaderActorContext = createActorContext();
1636         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1637                 new FiniteDuration(10000, TimeUnit.SECONDS));
1638
1639         leaderActorContext.setReplicatedLog(
1640                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1641         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1642
1643         leader = new Leader(leaderActorContext);
1644         leaderActor.underlyingActor().setBehavior(leader);
1645         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1646
1647         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1648
1649         assertEquals(false, appendEntriesReply.isSuccess());
1650         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1651
1652         MessageCollectorActor.clearMessages(leaderActor);
1653     }
1654
1655     @Test
1656     public void testHandleAppendEntriesReplySuccess() throws Exception {
1657         logStart("testHandleAppendEntriesReplySuccess");
1658
1659         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1660
1661         leaderActorContext.setReplicatedLog(
1662                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1663
1664         leaderActorContext.setCommitIndex(1);
1665         leaderActorContext.setLastApplied(1);
1666         leaderActorContext.getTermInformation().update(1, "leader");
1667
1668         leader = new Leader(leaderActorContext);
1669
1670         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1671
1672         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1673         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1674
1675         short payloadVersion = 5;
1676         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1677
1678         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1679
1680         assertEquals(RaftState.Leader, raftActorBehavior.state());
1681
1682         assertEquals(2, leaderActorContext.getCommitIndex());
1683
1684         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1685                 leaderActor, ApplyJournalEntries.class);
1686
1687         assertEquals(2, leaderActorContext.getLastApplied());
1688
1689         assertEquals(2, applyJournalEntries.getToIndex());
1690
1691         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1692                 ApplyState.class);
1693
1694         assertEquals(1,applyStateList.size());
1695
1696         ApplyState applyState = applyStateList.get(0);
1697
1698         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1699
1700         assertEquals(2, followerInfo.getMatchIndex());
1701         assertEquals(3, followerInfo.getNextIndex());
1702         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1703         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1704     }
1705
1706     @Test
1707     public void testHandleAppendEntriesReplyUnknownFollower(){
1708         logStart("testHandleAppendEntriesReplyUnknownFollower");
1709
1710         MockRaftActorContext leaderActorContext = createActorContext();
1711
1712         leader = new Leader(leaderActorContext);
1713
1714         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1715
1716         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1717
1718         assertEquals(RaftState.Leader, raftActorBehavior.state());
1719     }
1720
1721     @Test
1722     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1723         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1724
1725         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1726         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1727                 new FiniteDuration(1000, TimeUnit.SECONDS));
1728         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1729
1730         leaderActorContext.setReplicatedLog(
1731                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1732         long leaderCommitIndex = 3;
1733         leaderActorContext.setCommitIndex(leaderCommitIndex);
1734         leaderActorContext.setLastApplied(leaderCommitIndex);
1735
1736         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1737         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1738         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1739         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1740
1741         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1742
1743         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1744         followerActorContext.setCommitIndex(-1);
1745         followerActorContext.setLastApplied(-1);
1746
1747         Follower follower = new Follower(followerActorContext);
1748         followerActor.underlyingActor().setBehavior(follower);
1749         followerActorContext.setCurrentBehavior(follower);
1750
1751         leader = new Leader(leaderActorContext);
1752
1753         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1754         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1755
1756         MessageCollectorActor.clearMessages(followerActor);
1757         MessageCollectorActor.clearMessages(leaderActor);
1758
1759         // Verify initial AppendEntries sent with the leader's current commit index.
1760         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1761         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1762         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1763
1764         leaderActor.underlyingActor().setBehavior(leader);
1765         leaderActorContext.setCurrentBehavior(leader);
1766
1767         leader.handleMessage(followerActor, appendEntriesReply);
1768
1769         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1770         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1771
1772         appendEntries = appendEntriesList.get(0);
1773         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1774         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1775         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1776
1777         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1778         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1779                 appendEntries.getEntries().get(0).getData());
1780         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1781         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1782                 appendEntries.getEntries().get(1).getData());
1783
1784         appendEntries = appendEntriesList.get(1);
1785         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1786         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1787         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1788
1789         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1790         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1791                 appendEntries.getEntries().get(0).getData());
1792         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1793         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1794                 appendEntries.getEntries().get(1).getData());
1795
1796         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1797         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1798
1799         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1800
1801         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1802         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1803     }
1804
1805     @Test
1806     public void testHandleRequestVoteReply(){
1807         logStart("testHandleRequestVoteReply");
1808
1809         MockRaftActorContext leaderActorContext = createActorContext();
1810
1811         leader = new Leader(leaderActorContext);
1812
1813         // Should be a no-op.
1814         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1815                 new RequestVoteReply(1, true));
1816
1817         assertEquals(RaftState.Leader, raftActorBehavior.state());
1818
1819         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1820
1821         assertEquals(RaftState.Leader, raftActorBehavior.state());
1822     }
1823
1824     @Test
1825     public void testIsolatedLeaderCheckNoFollowers() {
1826         logStart("testIsolatedLeaderCheckNoFollowers");
1827
1828         MockRaftActorContext leaderActorContext = createActorContext();
1829
1830         leader = new Leader(leaderActorContext);
1831         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1832         assertTrue(behavior instanceof Leader);
1833     }
1834
1835     @Test
1836     public void testIsolatedLeaderCheckNoVotingFollowers() {
1837         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1838
1839         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1840         Follower follower = new Follower(followerActorContext);
1841         followerActor.underlyingActor().setBehavior(follower);
1842
1843         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1844         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1845                 new FiniteDuration(1000, TimeUnit.SECONDS));
1846         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1847
1848         leader = new Leader(leaderActorContext);
1849         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1850         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1851         assertTrue("Expected Leader", behavior instanceof Leader);
1852     }
1853
1854     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1855         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1856         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1857
1858         MockRaftActorContext leaderActorContext = createActorContext();
1859
1860         Map<String, String> peerAddresses = new HashMap<>();
1861         peerAddresses.put("follower-1", followerActor1.path().toString());
1862         peerAddresses.put("follower-2", followerActor2.path().toString());
1863
1864         leaderActorContext.setPeerAddresses(peerAddresses);
1865         leaderActorContext.setRaftPolicy(raftPolicy);
1866
1867         leader = new Leader(leaderActorContext);
1868
1869         leader.markFollowerActive("follower-1");
1870         leader.markFollowerActive("follower-2");
1871         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1872         assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1873
1874         // kill 1 follower and verify if that got killed
1875         final JavaTestKit probe = new JavaTestKit(getSystem());
1876         probe.watch(followerActor1);
1877         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1878         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1879         assertEquals(termMsg1.getActor(), followerActor1);
1880
1881         leader.markFollowerInActive("follower-1");
1882         leader.markFollowerActive("follower-2");
1883         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1884         assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1885
1886         // kill 2nd follower and leader should change to Isolated leader
1887         followerActor2.tell(PoisonPill.getInstance(), null);
1888         probe.watch(followerActor2);
1889         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1890         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1891         assertEquals(termMsg2.getActor(), followerActor2);
1892
1893         leader.markFollowerInActive("follower-2");
1894         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1895     }
1896
1897     @Test
1898     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1899         logStart("testIsolatedLeaderCheckTwoFollowers");
1900
1901         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1902
1903         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1904             behavior instanceof IsolatedLeader);
1905     }
1906
1907     @Test
1908     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1909         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1910
1911         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1912
1913         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1914                 behavior instanceof Leader);
1915     }
1916
1917     @Test
1918     public void testLaggingFollowerStarvation() throws Exception {
1919         logStart("testLaggingFollowerStarvation");
1920         new JavaTestKit(getSystem()) {{
1921             String leaderActorId = actorFactory.generateActorId("leader");
1922             String follower1ActorId = actorFactory.generateActorId("follower");
1923             String follower2ActorId = actorFactory.generateActorId("follower");
1924
1925             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1926                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1927             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1928             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1929
1930             MockRaftActorContext leaderActorContext =
1931                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1932
1933             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1934             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1935             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1936
1937             leaderActorContext.setConfigParams(configParams);
1938
1939             leaderActorContext.setReplicatedLog(
1940                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1941
1942             Map<String, String> peerAddresses = new HashMap<>();
1943             peerAddresses.put(follower1ActorId,
1944                     follower1Actor.path().toString());
1945             peerAddresses.put(follower2ActorId,
1946                     follower2Actor.path().toString());
1947
1948             leaderActorContext.setPeerAddresses(peerAddresses);
1949             leaderActorContext.getTermInformation().update(1, leaderActorId);
1950
1951             RaftActorBehavior leader = createBehavior(leaderActorContext);
1952
1953             leaderActor.underlyingActor().setBehavior(leader);
1954
1955             for(int i=1;i<6;i++) {
1956                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1957                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1958                 assertTrue(newBehavior == leader);
1959                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1960             }
1961
1962             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1963             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1964
1965             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1966                     heartbeats.size() > 1);
1967
1968             // Check if follower-2 got AppendEntries during this time and was not starved
1969             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1970
1971             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1972                     appendEntries.size() > 1);
1973
1974         }};
1975     }
1976
1977     @Test
1978     public void testReplicationConsensusWithNonVotingFollower() {
1979         logStart("testReplicationConsensusWithNonVotingFollower");
1980
1981         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1982         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1983                 new FiniteDuration(1000, TimeUnit.SECONDS));
1984
1985         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1986         leaderActorContext.setCommitIndex(-1);
1987         leaderActorContext.setLastApplied(-1);
1988
1989         String nonVotingFollowerId = "nonvoting-follower";
1990         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1991                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1992
1993         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1994
1995         leader = new Leader(leaderActorContext);
1996         leaderActorContext.setCurrentBehavior(leader);
1997
1998         // Ignore initial heartbeats
1999         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2000         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2001
2002         MessageCollectorActor.clearMessages(followerActor);
2003         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2004         MessageCollectorActor.clearMessages(leaderActor);
2005
2006         // Send a Replicate message and wait for AppendEntries.
2007         sendReplicate(leaderActorContext, 0);
2008
2009         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2010         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2011
2012         // Send reply only from the voting follower and verify consensus via ApplyState.
2013         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2014
2015         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2016
2017         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2018
2019         MessageCollectorActor.clearMessages(followerActor);
2020         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2021         MessageCollectorActor.clearMessages(leaderActor);
2022
2023         // Send another Replicate message
2024         sendReplicate(leaderActorContext, 1);
2025
2026         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2027         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2028                 AppendEntries.class);
2029         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2030         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2031
2032         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2033         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2034
2035         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2036
2037         // Send reply from the voting follower and verify consensus.
2038         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2039
2040         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2041     }
2042
2043     @Test
2044     public void testTransferLeadershipWithFollowerInSync() {
2045         logStart("testTransferLeadershipWithFollowerInSync");
2046
2047         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2048         leaderActorContext.setLastApplied(-1);
2049         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2050                 new FiniteDuration(1000, TimeUnit.SECONDS));
2051         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2052
2053         leader = new Leader(leaderActorContext);
2054         leaderActorContext.setCurrentBehavior(leader);
2055
2056         // Initial heartbeat
2057         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2058         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2059         MessageCollectorActor.clearMessages(followerActor);
2060
2061         sendReplicate(leaderActorContext, 0);
2062         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2063
2064         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2065         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2066         MessageCollectorActor.clearMessages(followerActor);
2067
2068         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2069         leader.transferLeadership(mockTransferCohort);
2070
2071         verify(mockTransferCohort, never()).transferComplete();
2072         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2074
2075         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2076         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2077
2078         // Leader should force an election timeout
2079         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2080
2081         verify(mockTransferCohort).transferComplete();
2082     }
2083
2084     @Test
2085     public void testTransferLeadershipWithEmptyLog() {
2086         logStart("testTransferLeadershipWithEmptyLog");
2087
2088         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2089         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2090                 new FiniteDuration(1000, TimeUnit.SECONDS));
2091         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2092
2093         leader = new Leader(leaderActorContext);
2094         leaderActorContext.setCurrentBehavior(leader);
2095
2096         // Initial heartbeat
2097         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2098         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2099         MessageCollectorActor.clearMessages(followerActor);
2100
2101         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2102         leader.transferLeadership(mockTransferCohort);
2103
2104         verify(mockTransferCohort, never()).transferComplete();
2105         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107
2108         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2109         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110
2111         // Leader should force an election timeout
2112         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2113
2114         verify(mockTransferCohort).transferComplete();
2115     }
2116
2117     @Test
2118     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2119         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2120
2121         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2122         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2123                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2124
2125         leader = new Leader(leaderActorContext);
2126         leaderActorContext.setCurrentBehavior(leader);
2127
2128         // Initial heartbeat
2129         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2130         MessageCollectorActor.clearMessages(followerActor);
2131
2132         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2133         leader.transferLeadership(mockTransferCohort);
2134
2135         verify(mockTransferCohort, never()).transferComplete();
2136
2137         // Sync up the follower.
2138         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2139         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2140         MessageCollectorActor.clearMessages(followerActor);
2141
2142         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2143                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2144         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2145         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2146         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2147
2148         // Leader should force an election timeout
2149         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2150
2151         verify(mockTransferCohort).transferComplete();
2152     }
2153
2154     @Test
2155     public void testTransferLeadershipWithFollowerSyncTimeout() {
2156         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2157
2158         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2159         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2160                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2161         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2162         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2163
2164         leader = new Leader(leaderActorContext);
2165         leaderActorContext.setCurrentBehavior(leader);
2166
2167         // Initial heartbeat
2168         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2169         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2170         MessageCollectorActor.clearMessages(followerActor);
2171
2172         sendReplicate(leaderActorContext, 0);
2173         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2174
2175         MessageCollectorActor.clearMessages(followerActor);
2176
2177         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2178         leader.transferLeadership(mockTransferCohort);
2179
2180         verify(mockTransferCohort, never()).transferComplete();
2181
2182         // Send heartbeats to time out the transfer.
2183         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2184             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2185                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2186             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2187         }
2188
2189         verify(mockTransferCohort).abortTransfer();
2190         verify(mockTransferCohort, never()).transferComplete();
2191         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2192     }
2193
2194     @Override
2195     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2196             ActorRef actorRef, RaftRPC rpc) throws Exception {
2197         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2198         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2199     }
2200
2201     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2202
2203         private final long electionTimeOutIntervalMillis;
2204         private final int snapshotChunkSize;
2205
2206         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2207             super();
2208             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2209             this.snapshotChunkSize = snapshotChunkSize;
2210         }
2211
2212         @Override
2213         public FiniteDuration getElectionTimeOutInterval() {
2214             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2215         }
2216
2217         @Override
2218         public int getSnapshotChunkSize() {
2219             return snapshotChunkSize;
2220         }
2221     }
2222 }