Bug in AbstractLeader replication consensus
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
35 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
36 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
37 import org.opendaylight.controller.cluster.raft.RaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
39 import org.opendaylight.controller.cluster.raft.RaftState;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
42 import org.opendaylight.controller.cluster.raft.SerializationUtils;
43 import org.opendaylight.controller.cluster.raft.Snapshot;
44 import org.opendaylight.controller.cluster.raft.VotingState;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
49 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
60 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
61 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
62 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
63 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
64 import scala.concurrent.duration.FiniteDuration;
65
66 public class LeaderTest extends AbstractLeaderTest {
67
68     static final String FOLLOWER_ID = "follower";
69     public static final String LEADER_ID = "leader";
70
71     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
72             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
73
74     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
75             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
76
77     private Leader leader;
78     private final short payloadVersion = 5;
79
80     @Override
81     @After
82     public void tearDown() throws Exception {
83         if(leader != null) {
84             leader.close();
85         }
86
87         super.tearDown();
88     }
89
90     @Test
91     public void testHandleMessageForUnknownMessage() throws Exception {
92         logStart("testHandleMessageForUnknownMessage");
93
94         leader = new Leader(createActorContext());
95
96         // handle message should return the Leader state when it receives an
97         // unknown message
98         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
99         Assert.assertTrue(behavior instanceof Leader);
100     }
101
102     @Test
103     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
105
106         MockRaftActorContext actorContext = createActorContextWithFollower();
107         short payloadVersion = (short)5;
108         actorContext.setPayloadVersion(payloadVersion);
109
110         long term = 1;
111         actorContext.getTermInformation().update(term, "");
112
113         leader = new Leader(actorContext);
114
115         // Leader should send an immediate heartbeat with no entries as follower is inactive.
116         long lastIndex = actorContext.getReplicatedLog().lastIndex();
117         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
118         assertEquals("getTerm", term, appendEntries.getTerm());
119         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
120         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
121         assertEquals("Entries size", 0, appendEntries.getEntries().size());
122         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
123
124         // The follower would normally reply - simulate that explicitly here.
125         leader.handleMessage(followerActor, new AppendEntriesReply(
126                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
127         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
128
129         followerActor.underlyingActor().clear();
130
131         // Sleep for the heartbeat interval so AppendEntries is sent.
132         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
133                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
134
135         leader.handleMessage(leaderActor, new SendHeartBeat());
136
137         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
138         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
139         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
140         assertEquals("Entries size", 1, appendEntries.getEntries().size());
141         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
142         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
143         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
144     }
145
146
147     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
148         return sendReplicate(actorContext, 1, index);
149     }
150
151     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
152         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
153         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
154                 term, index, payload);
155         actorContext.getReplicatedLog().append(newEntry);
156         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
157     }
158
159     @Test
160     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
161         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
162
163         MockRaftActorContext actorContext = createActorContextWithFollower();
164
165         long term = 1;
166         actorContext.getTermInformation().update(term, "");
167
168         leader = new Leader(actorContext);
169
170         // Leader will send an immediate heartbeat - ignore it.
171         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
172
173         // The follower would normally reply - simulate that explicitly here.
174         long lastIndex = actorContext.getReplicatedLog().lastIndex();
175         leader.handleMessage(followerActor, new AppendEntriesReply(
176                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
177         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
178
179         followerActor.underlyingActor().clear();
180
181         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
182
183         // State should not change
184         assertTrue(raftBehavior instanceof Leader);
185
186         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
187         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
188         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
189         assertEquals("Entries size", 1, appendEntries.getEntries().size());
190         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
191         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
192         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
193         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
194     }
195
196     @Test
197     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
198         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
199
200         MockRaftActorContext actorContext = createActorContextWithFollower();
201
202         // The raft context is initialized with a couple log entries. However the commitIndex
203         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
204         // committed and applied. Now it regains leadership with a higher term (2).
205         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
206         long newTerm = prevTerm + 1;
207         actorContext.getTermInformation().update(newTerm, "");
208
209         leader = new Leader(actorContext);
210
211         // Leader will send an immediate heartbeat - ignore it.
212         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
213
214         // The follower replies with the leader's current last index and term, simulating that it is
215         // up to date with the leader.
216         long lastIndex = actorContext.getReplicatedLog().lastIndex();
217         leader.handleMessage(followerActor, new AppendEntriesReply(
218                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
219
220         // The commit index should not get updated even though consensus was reached. This is b/c the
221         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
222         // from previous terms by counting replicas".
223         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
224
225         followerActor.underlyingActor().clear();
226
227         // Now replicate a new entry with the new term 2.
228         long newIndex = lastIndex + 1;
229         sendReplicate(actorContext, newTerm, newIndex);
230
231         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
232         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
233         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
234         assertEquals("Entries size", 1, appendEntries.getEntries().size());
235         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
236         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
237         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
238
239         // The follower replies with success. The leader should now update the commit index to the new index
240         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
241         // prior entries are committed indirectly".
242         leader.handleMessage(followerActor, new AppendEntriesReply(
243                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
244
245         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
246     }
247
248     @Test
249     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
250         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
251
252         MockRaftActorContext actorContext = createActorContextWithFollower();
253         actorContext.setRaftPolicy(createRaftPolicy(true, true));
254
255         long term = 1;
256         actorContext.getTermInformation().update(term, "");
257
258         leader = new Leader(actorContext);
259
260         // Leader will send an immediate heartbeat - ignore it.
261         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
262
263         // The follower would normally reply - simulate that explicitly here.
264         long lastIndex = actorContext.getReplicatedLog().lastIndex();
265         leader.handleMessage(followerActor, new AppendEntriesReply(
266                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
267         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
268
269         followerActor.underlyingActor().clear();
270
271         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
272
273         // State should not change
274         assertTrue(raftBehavior instanceof Leader);
275
276         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
277         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
278         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
279         assertEquals("Entries size", 1, appendEntries.getEntries().size());
280         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
281         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
282         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
283         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
284     }
285
286     @Test
287     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
288         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
289
290         MockRaftActorContext actorContext = createActorContextWithFollower();
291         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
292             @Override
293             public FiniteDuration getHeartBeatInterval() {
294                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
295             }
296         });
297
298         long term = 1;
299         actorContext.getTermInformation().update(term, "");
300
301         leader = new Leader(actorContext);
302
303         // Leader will send an immediate heartbeat - ignore it.
304         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
305
306         // The follower would normally reply - simulate that explicitly here.
307         long lastIndex = actorContext.getReplicatedLog().lastIndex();
308         leader.handleMessage(followerActor, new AppendEntriesReply(
309                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
310         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
311
312         followerActor.underlyingActor().clear();
313
314         for(int i=0;i<5;i++) {
315             sendReplicate(actorContext, lastIndex+i+1);
316         }
317
318         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
319         // We expect only 1 message to be sent because of two reasons,
320         // - an append entries reply was not received
321         // - the heartbeat interval has not expired
322         // In this scenario if multiple messages are sent they would likely be duplicates
323         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
324     }
325
326     @Test
327     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
328         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
329
330         MockRaftActorContext actorContext = createActorContextWithFollower();
331         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
332             @Override
333             public FiniteDuration getHeartBeatInterval() {
334                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
335             }
336         });
337
338         long term = 1;
339         actorContext.getTermInformation().update(term, "");
340
341         leader = new Leader(actorContext);
342
343         // Leader will send an immediate heartbeat - ignore it.
344         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
345
346         // The follower would normally reply - simulate that explicitly here.
347         long lastIndex = actorContext.getReplicatedLog().lastIndex();
348         leader.handleMessage(followerActor, new AppendEntriesReply(
349                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
350         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
351
352         followerActor.underlyingActor().clear();
353
354         for(int i=0;i<3;i++) {
355             sendReplicate(actorContext, lastIndex+i+1);
356             leader.handleMessage(followerActor, new AppendEntriesReply(
357                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
358
359         }
360
361         for(int i=3;i<5;i++) {
362             sendReplicate(actorContext, lastIndex + i + 1);
363         }
364
365         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
366         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
367         // get sent to the follower - but not the 5th
368         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
369
370         for(int i=0;i<4;i++) {
371             long expected = allMessages.get(i).getEntries().get(0).getIndex();
372             assertEquals(expected, i+2);
373         }
374     }
375
376     @Test
377     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
378         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
379
380         MockRaftActorContext actorContext = createActorContextWithFollower();
381         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
382             @Override
383             public FiniteDuration getHeartBeatInterval() {
384                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
385             }
386         });
387
388         long term = 1;
389         actorContext.getTermInformation().update(term, "");
390
391         leader = new Leader(actorContext);
392
393         // Leader will send an immediate heartbeat - ignore it.
394         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
395
396         // The follower would normally reply - simulate that explicitly here.
397         long lastIndex = actorContext.getReplicatedLog().lastIndex();
398         leader.handleMessage(followerActor, new AppendEntriesReply(
399                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
400         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
401
402         followerActor.underlyingActor().clear();
403
404         sendReplicate(actorContext, lastIndex+1);
405
406         // Wait slightly longer than heartbeat duration
407         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
408
409         leader.handleMessage(leaderActor, new SendHeartBeat());
410
411         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
412         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
413
414         assertEquals(1, allMessages.get(0).getEntries().size());
415         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
416         assertEquals(1, allMessages.get(1).getEntries().size());
417         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
418
419     }
420
421     @Test
422     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
423         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
424
425         MockRaftActorContext actorContext = createActorContextWithFollower();
426         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
427             @Override
428             public FiniteDuration getHeartBeatInterval() {
429                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
430             }
431         });
432
433         long term = 1;
434         actorContext.getTermInformation().update(term, "");
435
436         leader = new Leader(actorContext);
437
438         // Leader will send an immediate heartbeat - ignore it.
439         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
440
441         // The follower would normally reply - simulate that explicitly here.
442         long lastIndex = actorContext.getReplicatedLog().lastIndex();
443         leader.handleMessage(followerActor, new AppendEntriesReply(
444                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
445         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
446
447         followerActor.underlyingActor().clear();
448
449         for(int i=0;i<3;i++) {
450             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
451             leader.handleMessage(leaderActor, new SendHeartBeat());
452         }
453
454         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
455         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
456     }
457
458     @Test
459     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
460         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
461
462         MockRaftActorContext actorContext = createActorContextWithFollower();
463         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
464             @Override
465             public FiniteDuration getHeartBeatInterval() {
466                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
467             }
468         });
469
470         long term = 1;
471         actorContext.getTermInformation().update(term, "");
472
473         leader = new Leader(actorContext);
474
475         // Leader will send an immediate heartbeat - ignore it.
476         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
477
478         // The follower would normally reply - simulate that explicitly here.
479         long lastIndex = actorContext.getReplicatedLog().lastIndex();
480         leader.handleMessage(followerActor, new AppendEntriesReply(
481                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
482         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
483
484         followerActor.underlyingActor().clear();
485
486         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
487         leader.handleMessage(leaderActor, new SendHeartBeat());
488         sendReplicate(actorContext, lastIndex+1);
489
490         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
491         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
492
493         assertEquals(0, allMessages.get(0).getEntries().size());
494         assertEquals(1, allMessages.get(1).getEntries().size());
495     }
496
497
498     @Test
499     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
500         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
501
502         MockRaftActorContext actorContext = createActorContext();
503
504         leader = new Leader(actorContext);
505
506         actorContext.setLastApplied(0);
507
508         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
509         long term = actorContext.getTermInformation().getCurrentTerm();
510         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
511                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
512
513         actorContext.getReplicatedLog().append(newEntry);
514
515         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
516                 new Replicate(leaderActor, "state-id", newEntry));
517
518         // State should not change
519         assertTrue(raftBehavior instanceof Leader);
520
521         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
522
523         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
524         // one since lastApplied state is 0.
525         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
526                 leaderActor, ApplyState.class);
527         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
528
529         for(int i = 0; i <= newLogIndex - 1; i++ ) {
530             ApplyState applyState = applyStateList.get(i);
531             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
532             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
533         }
534
535         ApplyState last = applyStateList.get((int) newLogIndex - 1);
536         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
537         assertEquals("getIdentifier", "state-id", last.getIdentifier());
538     }
539
540     @Test
541     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
542         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
543
544         MockRaftActorContext actorContext = createActorContextWithFollower();
545
546         Map<String, String> leadersSnapshot = new HashMap<>();
547         leadersSnapshot.put("1", "A");
548         leadersSnapshot.put("2", "B");
549         leadersSnapshot.put("3", "C");
550
551         //clears leaders log
552         actorContext.getReplicatedLog().removeFrom(0);
553
554         final int commitIndex = 3;
555         final int snapshotIndex = 2;
556         final int newEntryIndex = 4;
557         final int snapshotTerm = 1;
558         final int currentTerm = 2;
559
560         // set the snapshot variables in replicatedlog
561         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563         actorContext.setCommitIndex(commitIndex);
564         //set follower timeout to 2 mins, helps during debugging
565         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
566
567         leader = new Leader(actorContext);
568
569         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
570         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
571
572         // new entry
573         ReplicatedLogImplEntry entry =
574                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
575                         new MockRaftActorContext.MockPayload("D"));
576
577         //update follower timestamp
578         leader.markFollowerActive(FOLLOWER_ID);
579
580         ByteString bs = toByteString(leadersSnapshot);
581         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
582                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
583         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
584         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
585
586         //send first chunk and no InstallSnapshotReply received yet
587         fts.getNextChunk();
588         fts.incrementChunkIndex();
589
590         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
591                 TimeUnit.MILLISECONDS);
592
593         leader.handleMessage(leaderActor, new SendHeartBeat());
594
595         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
596
597         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
598
599         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
600
601         //InstallSnapshotReply received
602         fts.markSendStatus(true);
603
604         leader.handleMessage(leaderActor, new SendHeartBeat());
605
606         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
607
608         assertEquals(commitIndex, is.getLastIncludedIndex());
609     }
610
611     @Test
612     public void testSendAppendEntriesSnapshotScenario() throws Exception {
613         logStart("testSendAppendEntriesSnapshotScenario");
614
615         MockRaftActorContext actorContext = createActorContextWithFollower();
616
617         Map<String, String> leadersSnapshot = new HashMap<>();
618         leadersSnapshot.put("1", "A");
619         leadersSnapshot.put("2", "B");
620         leadersSnapshot.put("3", "C");
621
622         //clears leaders log
623         actorContext.getReplicatedLog().removeFrom(0);
624
625         final int followersLastIndex = 2;
626         final int snapshotIndex = 3;
627         final int newEntryIndex = 4;
628         final int snapshotTerm = 1;
629         final int currentTerm = 2;
630
631         // set the snapshot variables in replicatedlog
632         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
633         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
634         actorContext.setCommitIndex(followersLastIndex);
635
636         leader = new Leader(actorContext);
637
638         // Leader will send an immediate heartbeat - ignore it.
639         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
640
641         // new entry
642         ReplicatedLogImplEntry entry =
643                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
644                         new MockRaftActorContext.MockPayload("D"));
645
646         actorContext.getReplicatedLog().append(entry);
647
648         //update follower timestamp
649         leader.markFollowerActive(FOLLOWER_ID);
650
651         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
652         RaftActorBehavior raftBehavior = leader.handleMessage(
653                 leaderActor, new Replicate(null, "state-id", entry));
654
655         assertTrue(raftBehavior instanceof Leader);
656
657         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
658     }
659
660     @Test
661     public void testInitiateInstallSnapshot() throws Exception {
662         logStart("testInitiateInstallSnapshot");
663
664         MockRaftActorContext actorContext = createActorContextWithFollower();
665
666         //clears leaders log
667         actorContext.getReplicatedLog().removeFrom(0);
668
669         final int followersLastIndex = 2;
670         final int snapshotIndex = 3;
671         final int newEntryIndex = 4;
672         final int snapshotTerm = 1;
673         final int currentTerm = 2;
674
675         // set the snapshot variables in replicatedlog
676         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
677         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
678         actorContext.setLastApplied(3);
679         actorContext.setCommitIndex(followersLastIndex);
680
681         leader = new Leader(actorContext);
682
683         // Leader will send an immediate heartbeat - ignore it.
684         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
685
686         // set the snapshot as absent and check if capture-snapshot is invoked.
687         leader.setSnapshot(null);
688
689         // new entry
690         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
691                 new MockRaftActorContext.MockPayload("D"));
692
693         actorContext.getReplicatedLog().append(entry);
694
695         //update follower timestamp
696         leader.markFollowerActive(FOLLOWER_ID);
697
698         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
699
700         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
701
702         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
703
704         assertTrue(cs.isInstallSnapshotInitiated());
705         assertEquals(3, cs.getLastAppliedIndex());
706         assertEquals(1, cs.getLastAppliedTerm());
707         assertEquals(4, cs.getLastIndex());
708         assertEquals(2, cs.getLastTerm());
709
710         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
711         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
712
713         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
714     }
715
716     @Test
717     public void testInitiateForceInstallSnapshot() throws Exception {
718         logStart("testInitiateForceInstallSnapshot");
719
720         MockRaftActorContext actorContext = createActorContextWithFollower();
721
722         final int followersLastIndex = 2;
723         final int snapshotIndex = -1;
724         final int newEntryIndex = 4;
725         final int snapshotTerm = -1;
726         final int currentTerm = 2;
727
728         // set the snapshot variables in replicatedlog
729         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
730         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
731         actorContext.setLastApplied(3);
732         actorContext.setCommitIndex(followersLastIndex);
733
734         actorContext.getReplicatedLog().removeFrom(0);
735
736         leader = new Leader(actorContext);
737
738         // Leader will send an immediate heartbeat - ignore it.
739         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
740
741         // set the snapshot as absent and check if capture-snapshot is invoked.
742         leader.setSnapshot(null);
743
744         for(int i=0;i<4;i++) {
745             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
746                     new MockRaftActorContext.MockPayload("X" + i)));
747         }
748
749         // new entry
750         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
751                 new MockRaftActorContext.MockPayload("D"));
752
753         actorContext.getReplicatedLog().append(entry);
754
755         //update follower timestamp
756         leader.markFollowerActive(FOLLOWER_ID);
757
758         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
759         // installed with a SendInstallSnapshot
760         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
761
762         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
763
764         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
765
766         assertTrue(cs.isInstallSnapshotInitiated());
767         assertEquals(3, cs.getLastAppliedIndex());
768         assertEquals(1, cs.getLastAppliedTerm());
769         assertEquals(4, cs.getLastIndex());
770         assertEquals(2, cs.getLastTerm());
771
772         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
773         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
774
775         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
776     }
777
778
779     @Test
780     public void testInstallSnapshot() throws Exception {
781         logStart("testInstallSnapshot");
782
783         MockRaftActorContext actorContext = createActorContextWithFollower();
784
785         Map<String, String> leadersSnapshot = new HashMap<>();
786         leadersSnapshot.put("1", "A");
787         leadersSnapshot.put("2", "B");
788         leadersSnapshot.put("3", "C");
789
790         //clears leaders log
791         actorContext.getReplicatedLog().removeFrom(0);
792
793         final int lastAppliedIndex = 3;
794         final int snapshotIndex = 2;
795         final int snapshotTerm = 1;
796         final int currentTerm = 2;
797
798         // set the snapshot variables in replicatedlog
799         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
800         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
801         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
802         actorContext.setCommitIndex(lastAppliedIndex);
803         actorContext.setLastApplied(lastAppliedIndex);
804
805         leader = new Leader(actorContext);
806
807         // Initial heartbeat.
808         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
809
810         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
811         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
812
813         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
814                 Collections.<ReplicatedLogEntry>emptyList(),
815                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
816
817         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
818
819         assertTrue(raftBehavior instanceof Leader);
820
821         // check if installsnapshot gets called with the correct values.
822
823         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
824
825         assertNotNull(installSnapshot.getData());
826         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
827         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
828
829         assertEquals(currentTerm, installSnapshot.getTerm());
830     }
831
832     @Test
833     public void testForceInstallSnapshot() throws Exception {
834         logStart("testForceInstallSnapshot");
835
836         MockRaftActorContext actorContext = createActorContextWithFollower();
837
838         Map<String, String> leadersSnapshot = new HashMap<>();
839         leadersSnapshot.put("1", "A");
840         leadersSnapshot.put("2", "B");
841         leadersSnapshot.put("3", "C");
842
843         final int lastAppliedIndex = 3;
844         final int snapshotIndex = -1;
845         final int snapshotTerm = -1;
846         final int currentTerm = 2;
847
848         // set the snapshot variables in replicatedlog
849         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
850         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
851         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
852         actorContext.setCommitIndex(lastAppliedIndex);
853         actorContext.setLastApplied(lastAppliedIndex);
854
855         leader = new Leader(actorContext);
856
857         // Initial heartbeat.
858         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
859
860         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
861         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
862
863         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
864                 Collections.<ReplicatedLogEntry>emptyList(),
865                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
866
867         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
868
869         assertTrue(raftBehavior instanceof Leader);
870
871         // check if installsnapshot gets called with the correct values.
872
873         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
874
875         assertNotNull(installSnapshot.getData());
876         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
877         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
878
879         assertEquals(currentTerm, installSnapshot.getTerm());
880     }
881
882     @Test
883     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
884         logStart("testHandleInstallSnapshotReplyLastChunk");
885
886         MockRaftActorContext actorContext = createActorContextWithFollower();
887
888         final int commitIndex = 3;
889         final int snapshotIndex = 2;
890         final int snapshotTerm = 1;
891         final int currentTerm = 2;
892
893         actorContext.setCommitIndex(commitIndex);
894
895         leader = new Leader(actorContext);
896
897         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
898         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
899
900         // Ignore initial heartbeat.
901         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
902
903         Map<String, String> leadersSnapshot = new HashMap<>();
904         leadersSnapshot.put("1", "A");
905         leadersSnapshot.put("2", "B");
906         leadersSnapshot.put("3", "C");
907
908         // set the snapshot variables in replicatedlog
909
910         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
911         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
912         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
913
914         ByteString bs = toByteString(leadersSnapshot);
915         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
916                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
917         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
918         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
919         while(!fts.isLastChunk(fts.getChunkIndex())) {
920             fts.getNextChunk();
921             fts.incrementChunkIndex();
922         }
923
924         //clears leaders log
925         actorContext.getReplicatedLog().removeFrom(0);
926
927         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
928                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
929
930         assertTrue(raftBehavior instanceof Leader);
931
932         assertEquals(0, leader.followerSnapshotSize());
933         assertEquals(1, leader.followerLogSize());
934         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
935         assertNotNull(fli);
936         assertEquals(commitIndex, fli.getMatchIndex());
937         assertEquals(commitIndex + 1, fli.getNextIndex());
938     }
939
940     @Test
941     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
942         logStart("testSendSnapshotfromInstallSnapshotReply");
943
944         MockRaftActorContext actorContext = createActorContextWithFollower();
945
946         final int commitIndex = 3;
947         final int snapshotIndex = 2;
948         final int snapshotTerm = 1;
949         final int currentTerm = 2;
950
951         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
952             @Override
953             public int getSnapshotChunkSize() {
954                 return 50;
955             }
956         };
957         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
958         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
959
960         actorContext.setConfigParams(configParams);
961         actorContext.setCommitIndex(commitIndex);
962
963         leader = new Leader(actorContext);
964
965         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
966         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
967
968         Map<String, String> leadersSnapshot = new HashMap<>();
969         leadersSnapshot.put("1", "A");
970         leadersSnapshot.put("2", "B");
971         leadersSnapshot.put("3", "C");
972
973         // set the snapshot variables in replicatedlog
974         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
975         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
976         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
977
978         ByteString bs = toByteString(leadersSnapshot);
979         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
980                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
981         leader.setSnapshot(snapshot);
982
983         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
984
985         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
986
987         assertEquals(1, installSnapshot.getChunkIndex());
988         assertEquals(3, installSnapshot.getTotalChunks());
989
990         followerActor.underlyingActor().clear();
991         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
992                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
993
994         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
995
996         assertEquals(2, installSnapshot.getChunkIndex());
997         assertEquals(3, installSnapshot.getTotalChunks());
998
999         followerActor.underlyingActor().clear();
1000         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1001                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1002
1003         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1004
1005         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1006         followerActor.underlyingActor().clear();
1007         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1008                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1009
1010         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1011
1012         Assert.assertNull(installSnapshot);
1013     }
1014
1015
1016     @Test
1017     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1018         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1019
1020         MockRaftActorContext actorContext = createActorContextWithFollower();
1021
1022         final int commitIndex = 3;
1023         final int snapshotIndex = 2;
1024         final int snapshotTerm = 1;
1025         final int currentTerm = 2;
1026
1027         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1028             @Override
1029             public int getSnapshotChunkSize() {
1030                 return 50;
1031             }
1032         });
1033
1034         actorContext.setCommitIndex(commitIndex);
1035
1036         leader = new Leader(actorContext);
1037
1038         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1039         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1040
1041         Map<String, String> leadersSnapshot = new HashMap<>();
1042         leadersSnapshot.put("1", "A");
1043         leadersSnapshot.put("2", "B");
1044         leadersSnapshot.put("3", "C");
1045
1046         // set the snapshot variables in replicatedlog
1047         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1048         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1049         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1050
1051         ByteString bs = toByteString(leadersSnapshot);
1052         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1053                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1054         leader.setSnapshot(snapshot);
1055
1056         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1057         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1058
1059         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1060
1061         assertEquals(1, installSnapshot.getChunkIndex());
1062         assertEquals(3, installSnapshot.getTotalChunks());
1063
1064         followerActor.underlyingActor().clear();
1065
1066         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1067                 FOLLOWER_ID, -1, false));
1068
1069         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1070                 TimeUnit.MILLISECONDS);
1071
1072         leader.handleMessage(leaderActor, new SendHeartBeat());
1073
1074         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1075
1076         assertEquals(1, installSnapshot.getChunkIndex());
1077         assertEquals(3, installSnapshot.getTotalChunks());
1078     }
1079
1080     @Test
1081     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1082         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1083
1084         MockRaftActorContext actorContext = createActorContextWithFollower();
1085
1086         final int commitIndex = 3;
1087         final int snapshotIndex = 2;
1088         final int snapshotTerm = 1;
1089         final int currentTerm = 2;
1090
1091         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1092             @Override
1093             public int getSnapshotChunkSize() {
1094                 return 50;
1095             }
1096         });
1097
1098         actorContext.setCommitIndex(commitIndex);
1099
1100         leader = new Leader(actorContext);
1101
1102         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1103         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1104
1105         Map<String, String> leadersSnapshot = new HashMap<>();
1106         leadersSnapshot.put("1", "A");
1107         leadersSnapshot.put("2", "B");
1108         leadersSnapshot.put("3", "C");
1109
1110         // set the snapshot variables in replicatedlog
1111         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1112         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1113         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1114
1115         ByteString bs = toByteString(leadersSnapshot);
1116         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1117                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1118         leader.setSnapshot(snapshot);
1119
1120         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1121
1122         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1123
1124         assertEquals(1, installSnapshot.getChunkIndex());
1125         assertEquals(3, installSnapshot.getTotalChunks());
1126         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1127
1128         int hashCode = installSnapshot.getData().hashCode();
1129
1130         followerActor.underlyingActor().clear();
1131
1132         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1133                 FOLLOWER_ID, 1, true));
1134
1135         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1136
1137         assertEquals(2, installSnapshot.getChunkIndex());
1138         assertEquals(3, installSnapshot.getTotalChunks());
1139         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1140     }
1141
1142     @Test
1143     public void testFollowerToSnapshotLogic() {
1144         logStart("testFollowerToSnapshotLogic");
1145
1146         MockRaftActorContext actorContext = createActorContext();
1147
1148         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1149             @Override
1150             public int getSnapshotChunkSize() {
1151                 return 50;
1152             }
1153         });
1154
1155         leader = new Leader(actorContext);
1156
1157         Map<String, String> leadersSnapshot = new HashMap<>();
1158         leadersSnapshot.put("1", "A");
1159         leadersSnapshot.put("2", "B");
1160         leadersSnapshot.put("3", "C");
1161
1162         ByteString bs = toByteString(leadersSnapshot);
1163         byte[] barray = bs.toByteArray();
1164
1165         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1166         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1167
1168         assertEquals(bs.size(), barray.length);
1169
1170         int chunkIndex=0;
1171         for (int i=0; i < barray.length; i = i + 50) {
1172             int j = i + 50;
1173             chunkIndex++;
1174
1175             if (i + 50 > barray.length) {
1176                 j = barray.length;
1177             }
1178
1179             ByteString chunk = fts.getNextChunk();
1180             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1181             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1182
1183             fts.markSendStatus(true);
1184             if (!fts.isLastChunk(chunkIndex)) {
1185                 fts.incrementChunkIndex();
1186             }
1187         }
1188
1189         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1190     }
1191
1192     @Override protected RaftActorBehavior createBehavior(
1193         RaftActorContext actorContext) {
1194         return new Leader(actorContext);
1195     }
1196
1197     @Override
1198     protected MockRaftActorContext createActorContext() {
1199         return createActorContext(leaderActor);
1200     }
1201
1202     @Override
1203     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1204         return createActorContext(LEADER_ID, actorRef);
1205     }
1206
1207     private MockRaftActorContext createActorContextWithFollower() {
1208         MockRaftActorContext actorContext = createActorContext();
1209         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1210                 followerActor.path().toString()).build());
1211         return actorContext;
1212     }
1213
1214     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1215         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1216         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1217         configParams.setElectionTimeoutFactor(100000);
1218         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1219         context.setConfigParams(configParams);
1220         context.setPayloadVersion(payloadVersion);
1221         return context;
1222     }
1223
1224     private MockRaftActorContext createFollowerActorContextWithLeader() {
1225         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1226         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1227         followerConfig.setElectionTimeoutFactor(10000);
1228         followerActorContext.setConfigParams(followerConfig);
1229         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1230         return followerActorContext;
1231     }
1232
1233     @Test
1234     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1235         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1236
1237         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1238
1239         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1240
1241         Follower follower = new Follower(followerActorContext);
1242         followerActor.underlyingActor().setBehavior(follower);
1243
1244         Map<String, String> peerAddresses = new HashMap<>();
1245         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1246
1247         leaderActorContext.setPeerAddresses(peerAddresses);
1248
1249         leaderActorContext.getReplicatedLog().removeFrom(0);
1250
1251         //create 3 entries
1252         leaderActorContext.setReplicatedLog(
1253                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1254
1255         leaderActorContext.setCommitIndex(1);
1256
1257         followerActorContext.getReplicatedLog().removeFrom(0);
1258
1259         // follower too has the exact same log entries and has the same commit index
1260         followerActorContext.setReplicatedLog(
1261                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1262
1263         followerActorContext.setCommitIndex(1);
1264
1265         leader = new Leader(leaderActorContext);
1266
1267         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1268
1269         assertEquals(1, appendEntries.getLeaderCommit());
1270         assertEquals(0, appendEntries.getEntries().size());
1271         assertEquals(0, appendEntries.getPrevLogIndex());
1272
1273         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1274                 leaderActor, AppendEntriesReply.class);
1275
1276         assertEquals(2, appendEntriesReply.getLogLastIndex());
1277         assertEquals(1, appendEntriesReply.getLogLastTerm());
1278
1279         // follower returns its next index
1280         assertEquals(2, appendEntriesReply.getLogLastIndex());
1281         assertEquals(1, appendEntriesReply.getLogLastTerm());
1282
1283         follower.close();
1284     }
1285
1286     @Test
1287     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1288         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1289
1290         MockRaftActorContext leaderActorContext = createActorContext();
1291
1292         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1293         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1294
1295         Follower follower = new Follower(followerActorContext);
1296         followerActor.underlyingActor().setBehavior(follower);
1297
1298         Map<String, String> leaderPeerAddresses = new HashMap<>();
1299         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1300
1301         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1302
1303         leaderActorContext.getReplicatedLog().removeFrom(0);
1304
1305         leaderActorContext.setReplicatedLog(
1306                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1307
1308         leaderActorContext.setCommitIndex(1);
1309
1310         followerActorContext.getReplicatedLog().removeFrom(0);
1311
1312         followerActorContext.setReplicatedLog(
1313                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1314
1315         // follower has the same log entries but its commit index > leaders commit index
1316         followerActorContext.setCommitIndex(2);
1317
1318         leader = new Leader(leaderActorContext);
1319
1320         // Initial heartbeat
1321         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1322
1323         assertEquals(1, appendEntries.getLeaderCommit());
1324         assertEquals(0, appendEntries.getEntries().size());
1325         assertEquals(0, appendEntries.getPrevLogIndex());
1326
1327         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1328                 leaderActor, AppendEntriesReply.class);
1329
1330         assertEquals(2, appendEntriesReply.getLogLastIndex());
1331         assertEquals(1, appendEntriesReply.getLogLastTerm());
1332
1333         leaderActor.underlyingActor().setBehavior(follower);
1334         leader.handleMessage(followerActor, appendEntriesReply);
1335
1336         leaderActor.underlyingActor().clear();
1337         followerActor.underlyingActor().clear();
1338
1339         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1340                 TimeUnit.MILLISECONDS);
1341
1342         leader.handleMessage(leaderActor, new SendHeartBeat());
1343
1344         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1345
1346         assertEquals(2, appendEntries.getLeaderCommit());
1347         assertEquals(0, appendEntries.getEntries().size());
1348         assertEquals(2, appendEntries.getPrevLogIndex());
1349
1350         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1351
1352         assertEquals(2, appendEntriesReply.getLogLastIndex());
1353         assertEquals(1, appendEntriesReply.getLogLastTerm());
1354
1355         assertEquals(2, followerActorContext.getCommitIndex());
1356
1357         follower.close();
1358     }
1359
1360     @Test
1361     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1362         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1363
1364         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1365         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1366                 new FiniteDuration(1000, TimeUnit.SECONDS));
1367
1368         leaderActorContext.setReplicatedLog(
1369                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1370         long leaderCommitIndex = 2;
1371         leaderActorContext.setCommitIndex(leaderCommitIndex);
1372         leaderActorContext.setLastApplied(leaderCommitIndex);
1373
1374         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1375         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1376
1377         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1378
1379         followerActorContext.setReplicatedLog(
1380                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1381         followerActorContext.setCommitIndex(0);
1382         followerActorContext.setLastApplied(0);
1383
1384         Follower follower = new Follower(followerActorContext);
1385         followerActor.underlyingActor().setBehavior(follower);
1386
1387         leader = new Leader(leaderActorContext);
1388
1389         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1390         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1391
1392         MessageCollectorActor.clearMessages(followerActor);
1393         MessageCollectorActor.clearMessages(leaderActor);
1394
1395         // Verify initial AppendEntries sent with the leader's current commit index.
1396         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1397         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1398         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1399
1400         leaderActor.underlyingActor().setBehavior(leader);
1401
1402         leader.handleMessage(followerActor, appendEntriesReply);
1403
1404         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1405         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1406
1407         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1408         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1409         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1410
1411         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1412         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1413                 appendEntries.getEntries().get(0).getData());
1414         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1415         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1416                 appendEntries.getEntries().get(1).getData());
1417
1418         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1419         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1420
1421         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1422
1423         ApplyState applyState = applyStateList.get(0);
1424         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1425         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1426         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1427                 applyState.getReplicatedLogEntry().getData());
1428
1429         applyState = applyStateList.get(1);
1430         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1431         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1432         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1433                 applyState.getReplicatedLogEntry().getData());
1434
1435         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1436         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1437     }
1438
1439     @Test
1440     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1441         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1442
1443         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1444         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1445                 new FiniteDuration(1000, TimeUnit.SECONDS));
1446
1447         leaderActorContext.setReplicatedLog(
1448                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1449         long leaderCommitIndex = 1;
1450         leaderActorContext.setCommitIndex(leaderCommitIndex);
1451         leaderActorContext.setLastApplied(leaderCommitIndex);
1452
1453         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1454         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1455
1456         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1457
1458         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1459         followerActorContext.setCommitIndex(-1);
1460         followerActorContext.setLastApplied(-1);
1461
1462         Follower follower = new Follower(followerActorContext);
1463         followerActor.underlyingActor().setBehavior(follower);
1464
1465         leader = new Leader(leaderActorContext);
1466
1467         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1468         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1469
1470         MessageCollectorActor.clearMessages(followerActor);
1471         MessageCollectorActor.clearMessages(leaderActor);
1472
1473         // Verify initial AppendEntries sent with the leader's current commit index.
1474         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1475         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1476         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1477
1478         leaderActor.underlyingActor().setBehavior(leader);
1479
1480         leader.handleMessage(followerActor, appendEntriesReply);
1481
1482         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1483         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1484
1485         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1486         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1487         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1488
1489         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1490         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1491                 appendEntries.getEntries().get(0).getData());
1492         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1493         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1494                 appendEntries.getEntries().get(1).getData());
1495
1496         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1497         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1498
1499         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1500
1501         ApplyState applyState = applyStateList.get(0);
1502         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1503         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1504         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1505                 applyState.getReplicatedLogEntry().getData());
1506
1507         applyState = applyStateList.get(1);
1508         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1509         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1510         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1511                 applyState.getReplicatedLogEntry().getData());
1512
1513         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1514         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1515     }
1516
1517     @Test
1518     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1519         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1520
1521         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1522         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1523                 new FiniteDuration(1000, TimeUnit.SECONDS));
1524
1525         leaderActorContext.setReplicatedLog(
1526                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1527         long leaderCommitIndex = 1;
1528         leaderActorContext.setCommitIndex(leaderCommitIndex);
1529         leaderActorContext.setLastApplied(leaderCommitIndex);
1530
1531         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1532         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1533
1534         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1535
1536         followerActorContext.setReplicatedLog(
1537                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1538         followerActorContext.setCommitIndex(-1);
1539         followerActorContext.setLastApplied(-1);
1540
1541         Follower follower = new Follower(followerActorContext);
1542         followerActor.underlyingActor().setBehavior(follower);
1543
1544         leader = new Leader(leaderActorContext);
1545
1546         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1547         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1548
1549         MessageCollectorActor.clearMessages(followerActor);
1550         MessageCollectorActor.clearMessages(leaderActor);
1551
1552         // Verify initial AppendEntries sent with the leader's current commit index.
1553         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1554         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1555         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1556
1557         leaderActor.underlyingActor().setBehavior(leader);
1558
1559         leader.handleMessage(followerActor, appendEntriesReply);
1560
1561         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1562         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1563
1564         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1565         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1566         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1567
1568         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1569         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1570         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1571                 appendEntries.getEntries().get(0).getData());
1572         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1573         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1574         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1575                 appendEntries.getEntries().get(1).getData());
1576
1577         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1578         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1579
1580         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1581
1582         ApplyState applyState = applyStateList.get(0);
1583         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1584         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1585         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1586                 applyState.getReplicatedLogEntry().getData());
1587
1588         applyState = applyStateList.get(1);
1589         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1590         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1591         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1592                 applyState.getReplicatedLogEntry().getData());
1593
1594         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1595         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1596         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1597     }
1598
1599     @Test
1600     public void testHandleAppendEntriesReplyWithNewerTerm(){
1601         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1602
1603         MockRaftActorContext leaderActorContext = createActorContext();
1604         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1605                 new FiniteDuration(10000, TimeUnit.SECONDS));
1606
1607         leaderActorContext.setReplicatedLog(
1608                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1609
1610         leader = new Leader(leaderActorContext);
1611         leaderActor.underlyingActor().setBehavior(leader);
1612         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1613
1614         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1615
1616         assertEquals(false, appendEntriesReply.isSuccess());
1617         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1618
1619         MessageCollectorActor.clearMessages(leaderActor);
1620     }
1621
1622     @Test
1623     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1624         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1625
1626         MockRaftActorContext leaderActorContext = createActorContext();
1627         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1628                 new FiniteDuration(10000, TimeUnit.SECONDS));
1629
1630         leaderActorContext.setReplicatedLog(
1631                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1632         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1633
1634         leader = new Leader(leaderActorContext);
1635         leaderActor.underlyingActor().setBehavior(leader);
1636         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1637
1638         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1639
1640         assertEquals(false, appendEntriesReply.isSuccess());
1641         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1642
1643         MessageCollectorActor.clearMessages(leaderActor);
1644     }
1645
1646     @Test
1647     public void testHandleAppendEntriesReplySuccess() throws Exception {
1648         logStart("testHandleAppendEntriesReplySuccess");
1649
1650         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1651
1652         leaderActorContext.setReplicatedLog(
1653                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1654
1655         leaderActorContext.setCommitIndex(1);
1656         leaderActorContext.setLastApplied(1);
1657         leaderActorContext.getTermInformation().update(1, "leader");
1658
1659         leader = new Leader(leaderActorContext);
1660
1661         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1662
1663         short payloadVersion = 5;
1664         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1665
1666         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1667
1668         assertEquals(RaftState.Leader, raftActorBehavior.state());
1669
1670         assertEquals(2, leaderActorContext.getCommitIndex());
1671
1672         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1673                 leaderActor, ApplyJournalEntries.class);
1674
1675         assertEquals(2, leaderActorContext.getLastApplied());
1676
1677         assertEquals(2, applyJournalEntries.getToIndex());
1678
1679         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1680                 ApplyState.class);
1681
1682         assertEquals(1,applyStateList.size());
1683
1684         ApplyState applyState = applyStateList.get(0);
1685
1686         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1687
1688         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1689         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1690     }
1691
1692     @Test
1693     public void testHandleAppendEntriesReplyUnknownFollower(){
1694         logStart("testHandleAppendEntriesReplyUnknownFollower");
1695
1696         MockRaftActorContext leaderActorContext = createActorContext();
1697
1698         leader = new Leader(leaderActorContext);
1699
1700         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1701
1702         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1703
1704         assertEquals(RaftState.Leader, raftActorBehavior.state());
1705     }
1706
1707     @Test
1708     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1709         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1710
1711         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1712         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1713                 new FiniteDuration(1000, TimeUnit.SECONDS));
1714         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1715
1716         leaderActorContext.setReplicatedLog(
1717                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1718         long leaderCommitIndex = 3;
1719         leaderActorContext.setCommitIndex(leaderCommitIndex);
1720         leaderActorContext.setLastApplied(leaderCommitIndex);
1721
1722         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1723         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1724         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1725         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1726
1727         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1728
1729         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1730         followerActorContext.setCommitIndex(-1);
1731         followerActorContext.setLastApplied(-1);
1732
1733         Follower follower = new Follower(followerActorContext);
1734         followerActor.underlyingActor().setBehavior(follower);
1735
1736         leader = new Leader(leaderActorContext);
1737
1738         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1739         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1740
1741         MessageCollectorActor.clearMessages(followerActor);
1742         MessageCollectorActor.clearMessages(leaderActor);
1743
1744         // Verify initial AppendEntries sent with the leader's current commit index.
1745         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1746         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1747         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1748
1749         leaderActor.underlyingActor().setBehavior(leader);
1750
1751         leader.handleMessage(followerActor, appendEntriesReply);
1752
1753         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1754         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1755
1756         appendEntries = appendEntriesList.get(0);
1757         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1758         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1759         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1760
1761         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1762         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1763                 appendEntries.getEntries().get(0).getData());
1764         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1765         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1766                 appendEntries.getEntries().get(1).getData());
1767
1768         appendEntries = appendEntriesList.get(1);
1769         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1770         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1771         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1772
1773         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1774         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1775                 appendEntries.getEntries().get(0).getData());
1776         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1777         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1778                 appendEntries.getEntries().get(1).getData());
1779
1780         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1781         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1782
1783         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1784
1785         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1786         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1787     }
1788
1789     @Test
1790     public void testHandleRequestVoteReply(){
1791         logStart("testHandleRequestVoteReply");
1792
1793         MockRaftActorContext leaderActorContext = createActorContext();
1794
1795         leader = new Leader(leaderActorContext);
1796
1797         // Should be a no-op.
1798         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1799                 new RequestVoteReply(1, true));
1800
1801         assertEquals(RaftState.Leader, raftActorBehavior.state());
1802
1803         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1804
1805         assertEquals(RaftState.Leader, raftActorBehavior.state());
1806     }
1807
1808     @Test
1809     public void testIsolatedLeaderCheckNoFollowers() {
1810         logStart("testIsolatedLeaderCheckNoFollowers");
1811
1812         MockRaftActorContext leaderActorContext = createActorContext();
1813
1814         leader = new Leader(leaderActorContext);
1815         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1816         Assert.assertTrue(behavior instanceof Leader);
1817     }
1818
1819     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1820         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1821         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1822
1823         MockRaftActorContext leaderActorContext = createActorContext();
1824
1825         Map<String, String> peerAddresses = new HashMap<>();
1826         peerAddresses.put("follower-1", followerActor1.path().toString());
1827         peerAddresses.put("follower-2", followerActor2.path().toString());
1828
1829         leaderActorContext.setPeerAddresses(peerAddresses);
1830         leaderActorContext.setRaftPolicy(raftPolicy);
1831
1832         leader = new Leader(leaderActorContext);
1833
1834         leader.markFollowerActive("follower-1");
1835         leader.markFollowerActive("follower-2");
1836         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1837         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1838                 behavior instanceof Leader);
1839
1840         // kill 1 follower and verify if that got killed
1841         final JavaTestKit probe = new JavaTestKit(getSystem());
1842         probe.watch(followerActor1);
1843         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1844         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1845         assertEquals(termMsg1.getActor(), followerActor1);
1846
1847         leader.markFollowerInActive("follower-1");
1848         leader.markFollowerActive("follower-2");
1849         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1850         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1851                 behavior instanceof Leader);
1852
1853         // kill 2nd follower and leader should change to Isolated leader
1854         followerActor2.tell(PoisonPill.getInstance(), null);
1855         probe.watch(followerActor2);
1856         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1857         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1858         assertEquals(termMsg2.getActor(), followerActor2);
1859
1860         leader.markFollowerInActive("follower-2");
1861         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1862     }
1863
1864     @Test
1865     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1866         logStart("testIsolatedLeaderCheckTwoFollowers");
1867
1868         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1869
1870         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1871             behavior instanceof IsolatedLeader);
1872     }
1873
1874     @Test
1875     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1876         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1877
1878         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1879
1880         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1881                 behavior instanceof Leader);
1882     }
1883
1884     @Test
1885     public void testLaggingFollowerStarvation() throws Exception {
1886         logStart("testLaggingFollowerStarvation");
1887         new JavaTestKit(getSystem()) {{
1888             String leaderActorId = actorFactory.generateActorId("leader");
1889             String follower1ActorId = actorFactory.generateActorId("follower");
1890             String follower2ActorId = actorFactory.generateActorId("follower");
1891
1892             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1893                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1894             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1895             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1896
1897             MockRaftActorContext leaderActorContext =
1898                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1899
1900             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1901             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1902             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1903
1904             leaderActorContext.setConfigParams(configParams);
1905
1906             leaderActorContext.setReplicatedLog(
1907                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1908
1909             Map<String, String> peerAddresses = new HashMap<>();
1910             peerAddresses.put(follower1ActorId,
1911                     follower1Actor.path().toString());
1912             peerAddresses.put(follower2ActorId,
1913                     follower2Actor.path().toString());
1914
1915             leaderActorContext.setPeerAddresses(peerAddresses);
1916             leaderActorContext.getTermInformation().update(1, leaderActorId);
1917
1918             RaftActorBehavior leader = createBehavior(leaderActorContext);
1919
1920             leaderActor.underlyingActor().setBehavior(leader);
1921
1922             for(int i=1;i<6;i++) {
1923                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1924                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1925                 assertTrue(newBehavior == leader);
1926                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1927             }
1928
1929             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1930             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1931
1932             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1933                     heartbeats.size() > 1);
1934
1935             // Check if follower-2 got AppendEntries during this time and was not starved
1936             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1937
1938             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1939                     appendEntries.size() > 1);
1940
1941         }};
1942     }
1943
1944     @Test
1945     public void testReplicationConsensusWithNonVotingFollower() {
1946         logStart("testReplicationConsensusWithNonVotingFollower");
1947
1948         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1949         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1950                 new FiniteDuration(1000, TimeUnit.SECONDS));
1951
1952         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1953
1954         String nonVotingFollowerId = "nonvoting-follower";
1955         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1956                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1957
1958         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1959
1960         leader = new Leader(leaderActorContext);
1961
1962         // Ignore initial heartbeats
1963         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1964         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1965
1966         MessageCollectorActor.clearMessages(followerActor);
1967         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1968         MessageCollectorActor.clearMessages(leaderActor);
1969
1970         // Send a Replicate message and wait for AppendEntries.
1971         sendReplicate(leaderActorContext, 0);
1972
1973         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1974         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1975
1976         // Send reply only from the voting follower and verify consensus via ApplyState.
1977         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1978
1979         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1980
1981         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1982
1983         MessageCollectorActor.clearMessages(followerActor);
1984         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1985         MessageCollectorActor.clearMessages(leaderActor);
1986
1987         // Send another Replicate message
1988         sendReplicate(leaderActorContext, 1);
1989
1990         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1991         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1992                 AppendEntries.class);
1993         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1994         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1995
1996         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1997         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1998
1999         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2000
2001         // Send reply from the voting follower and verify consensus.
2002         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2003
2004         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2005     }
2006
2007     @Test
2008     public void testTransferLeadershipWithFollowerInSync() {
2009         logStart("testTransferLeadershipWithFollowerInSync");
2010
2011         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2012         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2013                 new FiniteDuration(1000, TimeUnit.SECONDS));
2014         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2015
2016         leader = new Leader(leaderActorContext);
2017
2018         // Initial heartbeat
2019         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2020         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2021         MessageCollectorActor.clearMessages(followerActor);
2022
2023         sendReplicate(leaderActorContext, 0);
2024         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2025
2026         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2027         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2028         MessageCollectorActor.clearMessages(followerActor);
2029
2030         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2031         leader.transferLeadership(mockTransferCohort);
2032
2033         verify(mockTransferCohort, never()).transferComplete();
2034         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2035         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2036
2037         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2038         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2039
2040         // Leader should force an election timeout
2041         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2042
2043         verify(mockTransferCohort).transferComplete();
2044     }
2045
2046     @Test
2047     public void testTransferLeadershipWithEmptyLog() {
2048         logStart("testTransferLeadershipWithEmptyLog");
2049
2050         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052                 new FiniteDuration(1000, TimeUnit.SECONDS));
2053         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2054
2055         leader = new Leader(leaderActorContext);
2056
2057         // Initial heartbeat
2058         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2060         MessageCollectorActor.clearMessages(followerActor);
2061
2062         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2063         leader.transferLeadership(mockTransferCohort);
2064
2065         verify(mockTransferCohort, never()).transferComplete();
2066         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2067         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2068
2069         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2070         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2071
2072         // Leader should force an election timeout
2073         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2074
2075         verify(mockTransferCohort).transferComplete();
2076     }
2077
2078     @Test
2079     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2080         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2081
2082         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2083         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2084                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2085
2086         leader = new Leader(leaderActorContext);
2087
2088         // Initial heartbeat
2089         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2090         MessageCollectorActor.clearMessages(followerActor);
2091
2092         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2093         leader.transferLeadership(mockTransferCohort);
2094
2095         verify(mockTransferCohort, never()).transferComplete();
2096
2097         // Sync up the follower.
2098         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2099         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2100         MessageCollectorActor.clearMessages(followerActor);
2101
2102         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2103                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2104         leader.handleMessage(leaderActor, new SendHeartBeat());
2105         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2107
2108         // Leader should force an election timeout
2109         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2110
2111         verify(mockTransferCohort).transferComplete();
2112     }
2113
2114     @Test
2115     public void testTransferLeadershipWithFollowerSyncTimeout() {
2116         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2117
2118         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2119         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2120                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2121         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2122         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2123
2124         leader = new Leader(leaderActorContext);
2125
2126         // Initial heartbeat
2127         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2128         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2129         MessageCollectorActor.clearMessages(followerActor);
2130
2131         sendReplicate(leaderActorContext, 0);
2132         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133
2134         MessageCollectorActor.clearMessages(followerActor);
2135
2136         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2137         leader.transferLeadership(mockTransferCohort);
2138
2139         verify(mockTransferCohort, never()).transferComplete();
2140
2141         // Send heartbeats to time out the transfer.
2142         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2143             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2144                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2145             leader.handleMessage(leaderActor, new SendHeartBeat());
2146         }
2147
2148         verify(mockTransferCohort).abortTransfer();
2149         verify(mockTransferCohort, never()).transferComplete();
2150         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2151     }
2152
2153     @Override
2154     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2155             ActorRef actorRef, RaftRPC rpc) throws Exception {
2156         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2157         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2158     }
2159
2160     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2161
2162         private final long electionTimeOutIntervalMillis;
2163         private final int snapshotChunkSize;
2164
2165         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2166             super();
2167             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2168             this.snapshotChunkSize = snapshotChunkSize;
2169         }
2170
2171         @Override
2172         public FiniteDuration getElectionTimeOutInterval() {
2173             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2174         }
2175
2176         @Override
2177         public int getSnapshotChunkSize() {
2178             return snapshotChunkSize;
2179         }
2180     }
2181 }