b0d220a0d6fbd31db20992b2abd011bb20196d86
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
40 import org.opendaylight.controller.cluster.raft.RaftState;
41 import org.opendaylight.controller.cluster.raft.RaftVersions;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
44 import org.opendaylight.controller.cluster.raft.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.Snapshot;
46 import org.opendaylight.controller.cluster.raft.VotingState;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
65 import scala.concurrent.duration.FiniteDuration;
66
67 public class LeaderTest extends AbstractLeaderTest {
68
69     static final String FOLLOWER_ID = "follower";
70     public static final String LEADER_ID = "leader";
71
72     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
73             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
74
75     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
76             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
77
78     private Leader leader;
79     private final short payloadVersion = 5;
80
81     @Override
82     @After
83     public void tearDown() throws Exception {
84         if(leader != null) {
85             leader.close();
86         }
87
88         super.tearDown();
89     }
90
91     @Test
92     public void testHandleMessageForUnknownMessage() throws Exception {
93         logStart("testHandleMessageForUnknownMessage");
94
95         leader = new Leader(createActorContext());
96
97         // handle message should return the Leader state when it receives an
98         // unknown message
99         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
100         Assert.assertTrue(behavior instanceof Leader);
101     }
102
103     @Test
104     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106
107         MockRaftActorContext actorContext = createActorContextWithFollower();
108         short payloadVersion = (short)5;
109         actorContext.setPayloadVersion(payloadVersion);
110
111         long term = 1;
112         actorContext.getTermInformation().update(term, "");
113
114         leader = new Leader(actorContext);
115
116         // Leader should send an immediate heartbeat with no entries as follower is inactive.
117         long lastIndex = actorContext.getReplicatedLog().lastIndex();
118         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119         assertEquals("getTerm", term, appendEntries.getTerm());
120         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
121         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
122         assertEquals("Entries size", 0, appendEntries.getEntries().size());
123         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
124
125         // The follower would normally reply - simulate that explicitly here.
126         leader.handleMessage(followerActor, new AppendEntriesReply(
127                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
128         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
129
130         followerActor.underlyingActor().clear();
131
132         // Sleep for the heartbeat interval so AppendEntries is sent.
133         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
134                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
135
136         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
137
138         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
139         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
140         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
141         assertEquals("Entries size", 1, appendEntries.getEntries().size());
142         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
143         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
144         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
145     }
146
147
148     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
149         return sendReplicate(actorContext, 1, index);
150     }
151
152     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
153         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155                 term, index, payload);
156         actorContext.getReplicatedLog().append(newEntry);
157         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
158     }
159
160     @Test
161     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
162         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
163
164         MockRaftActorContext actorContext = createActorContextWithFollower();
165
166         long term = 1;
167         actorContext.getTermInformation().update(term, "");
168
169         leader = new Leader(actorContext);
170
171         // Leader will send an immediate heartbeat - ignore it.
172         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
173
174         // The follower would normally reply - simulate that explicitly here.
175         long lastIndex = actorContext.getReplicatedLog().lastIndex();
176         leader.handleMessage(followerActor, new AppendEntriesReply(
177                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
178         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
179
180         followerActor.underlyingActor().clear();
181
182         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
183
184         // State should not change
185         assertTrue(raftBehavior instanceof Leader);
186
187         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
188         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
189         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
190         assertEquals("Entries size", 1, appendEntries.getEntries().size());
191         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
192         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
193         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
194         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
195     }
196
197     @Test
198     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
199         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
200
201         MockRaftActorContext actorContext = createActorContextWithFollower();
202
203         // The raft context is initialized with a couple log entries. However the commitIndex
204         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
205         // committed and applied. Now it regains leadership with a higher term (2).
206         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
207         long newTerm = prevTerm + 1;
208         actorContext.getTermInformation().update(newTerm, "");
209
210         leader = new Leader(actorContext);
211
212         // Leader will send an immediate heartbeat - ignore it.
213         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
214
215         // The follower replies with the leader's current last index and term, simulating that it is
216         // up to date with the leader.
217         long lastIndex = actorContext.getReplicatedLog().lastIndex();
218         leader.handleMessage(followerActor, new AppendEntriesReply(
219                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
220
221         // The commit index should not get updated even though consensus was reached. This is b/c the
222         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
223         // from previous terms by counting replicas".
224         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
225
226         followerActor.underlyingActor().clear();
227
228         // Now replicate a new entry with the new term 2.
229         long newIndex = lastIndex + 1;
230         sendReplicate(actorContext, newTerm, newIndex);
231
232         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
234         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
235         assertEquals("Entries size", 1, appendEntries.getEntries().size());
236         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
237         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
238         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
239
240         // The follower replies with success. The leader should now update the commit index to the new index
241         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
242         // prior entries are committed indirectly".
243         leader.handleMessage(followerActor, new AppendEntriesReply(
244                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
245
246         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
247     }
248
249     @Test
250     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
251         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
252
253         MockRaftActorContext actorContext = createActorContextWithFollower();
254         actorContext.setRaftPolicy(createRaftPolicy(true, true));
255
256         long term = 1;
257         actorContext.getTermInformation().update(term, "");
258
259         leader = new Leader(actorContext);
260
261         // Leader will send an immediate heartbeat - ignore it.
262         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
263
264         // The follower would normally reply - simulate that explicitly here.
265         long lastIndex = actorContext.getReplicatedLog().lastIndex();
266         leader.handleMessage(followerActor, new AppendEntriesReply(
267                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
268         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
269
270         followerActor.underlyingActor().clear();
271
272         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
273
274         // State should not change
275         assertTrue(raftBehavior instanceof Leader);
276
277         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
278         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
279         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
280         assertEquals("Entries size", 1, appendEntries.getEntries().size());
281         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
282         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
283         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
284         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
285     }
286
287     @Test
288     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
289         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
290
291         MockRaftActorContext actorContext = createActorContextWithFollower();
292         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
293             @Override
294             public FiniteDuration getHeartBeatInterval() {
295                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
296             }
297         });
298
299         long term = 1;
300         actorContext.getTermInformation().update(term, "");
301
302         leader = new Leader(actorContext);
303
304         // Leader will send an immediate heartbeat - ignore it.
305         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
306
307         // The follower would normally reply - simulate that explicitly here.
308         long lastIndex = actorContext.getReplicatedLog().lastIndex();
309         leader.handleMessage(followerActor, new AppendEntriesReply(
310                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
311         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
312
313         followerActor.underlyingActor().clear();
314
315         for(int i=0;i<5;i++) {
316             sendReplicate(actorContext, lastIndex+i+1);
317         }
318
319         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
320         // We expect only 1 message to be sent because of two reasons,
321         // - an append entries reply was not received
322         // - the heartbeat interval has not expired
323         // In this scenario if multiple messages are sent they would likely be duplicates
324         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
325     }
326
327     @Test
328     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
329         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
330
331         MockRaftActorContext actorContext = createActorContextWithFollower();
332         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
333             @Override
334             public FiniteDuration getHeartBeatInterval() {
335                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
336             }
337         });
338
339         long term = 1;
340         actorContext.getTermInformation().update(term, "");
341
342         leader = new Leader(actorContext);
343
344         // Leader will send an immediate heartbeat - ignore it.
345         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
346
347         // The follower would normally reply - simulate that explicitly here.
348         long lastIndex = actorContext.getReplicatedLog().lastIndex();
349         leader.handleMessage(followerActor, new AppendEntriesReply(
350                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
351         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
352
353         followerActor.underlyingActor().clear();
354
355         for(int i=0;i<3;i++) {
356             sendReplicate(actorContext, lastIndex+i+1);
357             leader.handleMessage(followerActor, new AppendEntriesReply(
358                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
359
360         }
361
362         for(int i=3;i<5;i++) {
363             sendReplicate(actorContext, lastIndex + i + 1);
364         }
365
366         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
367         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
368         // get sent to the follower - but not the 5th
369         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
370
371         for(int i=0;i<4;i++) {
372             long expected = allMessages.get(i).getEntries().get(0).getIndex();
373             assertEquals(expected, i+2);
374         }
375     }
376
377     @Test
378     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
379         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
380
381         MockRaftActorContext actorContext = createActorContextWithFollower();
382         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
383             @Override
384             public FiniteDuration getHeartBeatInterval() {
385                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
386             }
387         });
388
389         long term = 1;
390         actorContext.getTermInformation().update(term, "");
391
392         leader = new Leader(actorContext);
393
394         // Leader will send an immediate heartbeat - ignore it.
395         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
396
397         // The follower would normally reply - simulate that explicitly here.
398         long lastIndex = actorContext.getReplicatedLog().lastIndex();
399         leader.handleMessage(followerActor, new AppendEntriesReply(
400                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
401         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
402
403         followerActor.underlyingActor().clear();
404
405         sendReplicate(actorContext, lastIndex+1);
406
407         // Wait slightly longer than heartbeat duration
408         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
409
410         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
411
412         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
413         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
414
415         assertEquals(1, allMessages.get(0).getEntries().size());
416         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
417         assertEquals(1, allMessages.get(1).getEntries().size());
418         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
419
420     }
421
422     @Test
423     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
424         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
425
426         MockRaftActorContext actorContext = createActorContextWithFollower();
427         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
428             @Override
429             public FiniteDuration getHeartBeatInterval() {
430                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
431             }
432         });
433
434         long term = 1;
435         actorContext.getTermInformation().update(term, "");
436
437         leader = new Leader(actorContext);
438
439         // Leader will send an immediate heartbeat - ignore it.
440         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
441
442         // The follower would normally reply - simulate that explicitly here.
443         long lastIndex = actorContext.getReplicatedLog().lastIndex();
444         leader.handleMessage(followerActor, new AppendEntriesReply(
445                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
446         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
447
448         followerActor.underlyingActor().clear();
449
450         for(int i=0;i<3;i++) {
451             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
452             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
453         }
454
455         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
456         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
457     }
458
459     @Test
460     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
461         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
462
463         MockRaftActorContext actorContext = createActorContextWithFollower();
464         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
465             @Override
466             public FiniteDuration getHeartBeatInterval() {
467                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
468             }
469         });
470
471         long term = 1;
472         actorContext.getTermInformation().update(term, "");
473
474         leader = new Leader(actorContext);
475
476         // Leader will send an immediate heartbeat - ignore it.
477         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478
479         // The follower would normally reply - simulate that explicitly here.
480         long lastIndex = actorContext.getReplicatedLog().lastIndex();
481         leader.handleMessage(followerActor, new AppendEntriesReply(
482                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
483         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
484
485         followerActor.underlyingActor().clear();
486
487         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
488         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
489         sendReplicate(actorContext, lastIndex+1);
490
491         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
492         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
493
494         assertEquals(0, allMessages.get(0).getEntries().size());
495         assertEquals(1, allMessages.get(1).getEntries().size());
496     }
497
498
499     @Test
500     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
501         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
502
503         MockRaftActorContext actorContext = createActorContext();
504
505         leader = new Leader(actorContext);
506
507         actorContext.setLastApplied(0);
508
509         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
510         long term = actorContext.getTermInformation().getCurrentTerm();
511         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
512                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
513
514         actorContext.getReplicatedLog().append(newEntry);
515
516         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
517                 new Replicate(leaderActor, "state-id", newEntry));
518
519         // State should not change
520         assertTrue(raftBehavior instanceof Leader);
521
522         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
523
524         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
525         // one since lastApplied state is 0.
526         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
527                 leaderActor, ApplyState.class);
528         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
529
530         for(int i = 0; i <= newLogIndex - 1; i++ ) {
531             ApplyState applyState = applyStateList.get(i);
532             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
533             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
534         }
535
536         ApplyState last = applyStateList.get((int) newLogIndex - 1);
537         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
538         assertEquals("getIdentifier", "state-id", last.getIdentifier());
539     }
540
541     @Test
542     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
543         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
544
545         MockRaftActorContext actorContext = createActorContextWithFollower();
546
547         Map<String, String> leadersSnapshot = new HashMap<>();
548         leadersSnapshot.put("1", "A");
549         leadersSnapshot.put("2", "B");
550         leadersSnapshot.put("3", "C");
551
552         //clears leaders log
553         actorContext.getReplicatedLog().removeFrom(0);
554
555         final int commitIndex = 3;
556         final int snapshotIndex = 2;
557         final int newEntryIndex = 4;
558         final int snapshotTerm = 1;
559         final int currentTerm = 2;
560
561         // set the snapshot variables in replicatedlog
562         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
563         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
564         actorContext.setCommitIndex(commitIndex);
565         //set follower timeout to 2 mins, helps during debugging
566         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
567
568         leader = new Leader(actorContext);
569
570         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
571         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
572
573         // new entry
574         ReplicatedLogImplEntry entry =
575                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
576                         new MockRaftActorContext.MockPayload("D"));
577
578         //update follower timestamp
579         leader.markFollowerActive(FOLLOWER_ID);
580
581         ByteString bs = toByteString(leadersSnapshot);
582         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
583                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
584         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
585         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
586
587         //send first chunk and no InstallSnapshotReply received yet
588         fts.getNextChunk();
589         fts.incrementChunkIndex();
590
591         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
592                 TimeUnit.MILLISECONDS);
593
594         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
595
596         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
597
598         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
599
600         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
601
602         //InstallSnapshotReply received
603         fts.markSendStatus(true);
604
605         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
606
607         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
608
609         assertEquals(commitIndex, is.getLastIncludedIndex());
610     }
611
612     @Test
613     public void testSendAppendEntriesSnapshotScenario() throws Exception {
614         logStart("testSendAppendEntriesSnapshotScenario");
615
616         MockRaftActorContext actorContext = createActorContextWithFollower();
617
618         Map<String, String> leadersSnapshot = new HashMap<>();
619         leadersSnapshot.put("1", "A");
620         leadersSnapshot.put("2", "B");
621         leadersSnapshot.put("3", "C");
622
623         //clears leaders log
624         actorContext.getReplicatedLog().removeFrom(0);
625
626         final int followersLastIndex = 2;
627         final int snapshotIndex = 3;
628         final int newEntryIndex = 4;
629         final int snapshotTerm = 1;
630         final int currentTerm = 2;
631
632         // set the snapshot variables in replicatedlog
633         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
634         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
635         actorContext.setCommitIndex(followersLastIndex);
636
637         leader = new Leader(actorContext);
638
639         // Leader will send an immediate heartbeat - ignore it.
640         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
641
642         // new entry
643         ReplicatedLogImplEntry entry =
644                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
645                         new MockRaftActorContext.MockPayload("D"));
646
647         actorContext.getReplicatedLog().append(entry);
648
649         //update follower timestamp
650         leader.markFollowerActive(FOLLOWER_ID);
651
652         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
653         RaftActorBehavior raftBehavior = leader.handleMessage(
654                 leaderActor, new Replicate(null, "state-id", entry));
655
656         assertTrue(raftBehavior instanceof Leader);
657
658         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
659     }
660
661     @Test
662     public void testInitiateInstallSnapshot() throws Exception {
663         logStart("testInitiateInstallSnapshot");
664
665         MockRaftActorContext actorContext = createActorContextWithFollower();
666
667         //clears leaders log
668         actorContext.getReplicatedLog().removeFrom(0);
669
670         final int followersLastIndex = 2;
671         final int snapshotIndex = 3;
672         final int newEntryIndex = 4;
673         final int snapshotTerm = 1;
674         final int currentTerm = 2;
675
676         // set the snapshot variables in replicatedlog
677         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
678         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
679         actorContext.setLastApplied(3);
680         actorContext.setCommitIndex(followersLastIndex);
681
682         leader = new Leader(actorContext);
683
684         // Leader will send an immediate heartbeat - ignore it.
685         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
686
687         // set the snapshot as absent and check if capture-snapshot is invoked.
688         leader.setSnapshot(null);
689
690         // new entry
691         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
692                 new MockRaftActorContext.MockPayload("D"));
693
694         actorContext.getReplicatedLog().append(entry);
695
696         //update follower timestamp
697         leader.markFollowerActive(FOLLOWER_ID);
698
699         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
700
701         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
702
703         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
704
705         assertTrue(cs.isInstallSnapshotInitiated());
706         assertEquals(3, cs.getLastAppliedIndex());
707         assertEquals(1, cs.getLastAppliedTerm());
708         assertEquals(4, cs.getLastIndex());
709         assertEquals(2, cs.getLastTerm());
710
711         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
712         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
713
714         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
715     }
716
717     @Test
718     public void testInitiateForceInstallSnapshot() throws Exception {
719         logStart("testInitiateForceInstallSnapshot");
720
721         MockRaftActorContext actorContext = createActorContextWithFollower();
722
723         final int followersLastIndex = 2;
724         final int snapshotIndex = -1;
725         final int newEntryIndex = 4;
726         final int snapshotTerm = -1;
727         final int currentTerm = 2;
728
729         // set the snapshot variables in replicatedlog
730         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
731         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
732         actorContext.setLastApplied(3);
733         actorContext.setCommitIndex(followersLastIndex);
734
735         actorContext.getReplicatedLog().removeFrom(0);
736
737         leader = new Leader(actorContext);
738
739         // Leader will send an immediate heartbeat - ignore it.
740         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
741
742         // set the snapshot as absent and check if capture-snapshot is invoked.
743         leader.setSnapshot(null);
744
745         for(int i=0;i<4;i++) {
746             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
747                     new MockRaftActorContext.MockPayload("X" + i)));
748         }
749
750         // new entry
751         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
752                 new MockRaftActorContext.MockPayload("D"));
753
754         actorContext.getReplicatedLog().append(entry);
755
756         //update follower timestamp
757         leader.markFollowerActive(FOLLOWER_ID);
758
759         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
760         // installed with a SendInstallSnapshot
761         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
762
763         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
764
765         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
766
767         assertTrue(cs.isInstallSnapshotInitiated());
768         assertEquals(3, cs.getLastAppliedIndex());
769         assertEquals(1, cs.getLastAppliedTerm());
770         assertEquals(4, cs.getLastIndex());
771         assertEquals(2, cs.getLastTerm());
772
773         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
774         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
775
776         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
777     }
778
779
780     @Test
781     public void testInstallSnapshot() throws Exception {
782         logStart("testInstallSnapshot");
783
784         MockRaftActorContext actorContext = createActorContextWithFollower();
785
786         Map<String, String> leadersSnapshot = new HashMap<>();
787         leadersSnapshot.put("1", "A");
788         leadersSnapshot.put("2", "B");
789         leadersSnapshot.put("3", "C");
790
791         //clears leaders log
792         actorContext.getReplicatedLog().removeFrom(0);
793
794         final int lastAppliedIndex = 3;
795         final int snapshotIndex = 2;
796         final int snapshotTerm = 1;
797         final int currentTerm = 2;
798
799         // set the snapshot variables in replicatedlog
800         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
801         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
802         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
803         actorContext.setCommitIndex(lastAppliedIndex);
804         actorContext.setLastApplied(lastAppliedIndex);
805
806         leader = new Leader(actorContext);
807
808         // Initial heartbeat.
809         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
810
811         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
812         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
813
814         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
815                 Collections.<ReplicatedLogEntry>emptyList(),
816                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
817
818         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
819
820         assertTrue(raftBehavior instanceof Leader);
821
822         // check if installsnapshot gets called with the correct values.
823
824         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825
826         assertNotNull(installSnapshot.getData());
827         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
828         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
829
830         assertEquals(currentTerm, installSnapshot.getTerm());
831     }
832
833     @Test
834     public void testForceInstallSnapshot() throws Exception {
835         logStart("testForceInstallSnapshot");
836
837         MockRaftActorContext actorContext = createActorContextWithFollower();
838
839         Map<String, String> leadersSnapshot = new HashMap<>();
840         leadersSnapshot.put("1", "A");
841         leadersSnapshot.put("2", "B");
842         leadersSnapshot.put("3", "C");
843
844         final int lastAppliedIndex = 3;
845         final int snapshotIndex = -1;
846         final int snapshotTerm = -1;
847         final int currentTerm = 2;
848
849         // set the snapshot variables in replicatedlog
850         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
851         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
852         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
853         actorContext.setCommitIndex(lastAppliedIndex);
854         actorContext.setLastApplied(lastAppliedIndex);
855
856         leader = new Leader(actorContext);
857
858         // Initial heartbeat.
859         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
860
861         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
862         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
863
864         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
865                 Collections.<ReplicatedLogEntry>emptyList(),
866                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
867
868         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
869
870         assertTrue(raftBehavior instanceof Leader);
871
872         // check if installsnapshot gets called with the correct values.
873
874         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
875
876         assertNotNull(installSnapshot.getData());
877         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
878         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
879
880         assertEquals(currentTerm, installSnapshot.getTerm());
881     }
882
883     @Test
884     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
885         logStart("testHandleInstallSnapshotReplyLastChunk");
886
887         MockRaftActorContext actorContext = createActorContextWithFollower();
888
889         final int commitIndex = 3;
890         final int snapshotIndex = 2;
891         final int snapshotTerm = 1;
892         final int currentTerm = 2;
893
894         actorContext.setCommitIndex(commitIndex);
895
896         leader = new Leader(actorContext);
897
898         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
899         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
900
901         // Ignore initial heartbeat.
902         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
903
904         Map<String, String> leadersSnapshot = new HashMap<>();
905         leadersSnapshot.put("1", "A");
906         leadersSnapshot.put("2", "B");
907         leadersSnapshot.put("3", "C");
908
909         // set the snapshot variables in replicatedlog
910
911         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
912         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
913         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
914
915         ByteString bs = toByteString(leadersSnapshot);
916         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
917                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
918         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
919         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
920         while(!fts.isLastChunk(fts.getChunkIndex())) {
921             fts.getNextChunk();
922             fts.incrementChunkIndex();
923         }
924
925         //clears leaders log
926         actorContext.getReplicatedLog().removeFrom(0);
927
928         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
929                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
930
931         assertTrue(raftBehavior instanceof Leader);
932
933         assertEquals(0, leader.followerSnapshotSize());
934         assertEquals(1, leader.followerLogSize());
935         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
936         assertNotNull(fli);
937         assertEquals(commitIndex, fli.getMatchIndex());
938         assertEquals(commitIndex + 1, fli.getNextIndex());
939     }
940
941     @Test
942     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
943         logStart("testSendSnapshotfromInstallSnapshotReply");
944
945         MockRaftActorContext actorContext = createActorContextWithFollower();
946
947         final int commitIndex = 3;
948         final int snapshotIndex = 2;
949         final int snapshotTerm = 1;
950         final int currentTerm = 2;
951
952         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
953             @Override
954             public int getSnapshotChunkSize() {
955                 return 50;
956             }
957         };
958         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
959         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
960
961         actorContext.setConfigParams(configParams);
962         actorContext.setCommitIndex(commitIndex);
963
964         leader = new Leader(actorContext);
965
966         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
967         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
968
969         Map<String, String> leadersSnapshot = new HashMap<>();
970         leadersSnapshot.put("1", "A");
971         leadersSnapshot.put("2", "B");
972         leadersSnapshot.put("3", "C");
973
974         // set the snapshot variables in replicatedlog
975         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
976         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
977         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
978
979         ByteString bs = toByteString(leadersSnapshot);
980         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
981                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
982         leader.setSnapshot(snapshot);
983
984         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
985
986         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
987
988         assertEquals(1, installSnapshot.getChunkIndex());
989         assertEquals(3, installSnapshot.getTotalChunks());
990
991         followerActor.underlyingActor().clear();
992         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
993                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
994
995         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
996
997         assertEquals(2, installSnapshot.getChunkIndex());
998         assertEquals(3, installSnapshot.getTotalChunks());
999
1000         followerActor.underlyingActor().clear();
1001         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1002                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1003
1004         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1005
1006         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1007         followerActor.underlyingActor().clear();
1008         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1010
1011         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1012
1013         Assert.assertNull(installSnapshot);
1014     }
1015
1016
1017     @Test
1018     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1019         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1020
1021         MockRaftActorContext actorContext = createActorContextWithFollower();
1022
1023         final int commitIndex = 3;
1024         final int snapshotIndex = 2;
1025         final int snapshotTerm = 1;
1026         final int currentTerm = 2;
1027
1028         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1029             @Override
1030             public int getSnapshotChunkSize() {
1031                 return 50;
1032             }
1033         });
1034
1035         actorContext.setCommitIndex(commitIndex);
1036
1037         leader = new Leader(actorContext);
1038
1039         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1040         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1041
1042         Map<String, String> leadersSnapshot = new HashMap<>();
1043         leadersSnapshot.put("1", "A");
1044         leadersSnapshot.put("2", "B");
1045         leadersSnapshot.put("3", "C");
1046
1047         // set the snapshot variables in replicatedlog
1048         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1049         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1050         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1051
1052         ByteString bs = toByteString(leadersSnapshot);
1053         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1054                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1055         leader.setSnapshot(snapshot);
1056
1057         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1058         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1059
1060         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1061
1062         assertEquals(1, installSnapshot.getChunkIndex());
1063         assertEquals(3, installSnapshot.getTotalChunks());
1064
1065         followerActor.underlyingActor().clear();
1066
1067         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1068                 FOLLOWER_ID, -1, false));
1069
1070         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1071                 TimeUnit.MILLISECONDS);
1072
1073         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1074
1075         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1076
1077         assertEquals(1, installSnapshot.getChunkIndex());
1078         assertEquals(3, installSnapshot.getTotalChunks());
1079     }
1080
1081     @Test
1082     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1083         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1084
1085         MockRaftActorContext actorContext = createActorContextWithFollower();
1086
1087         final int commitIndex = 3;
1088         final int snapshotIndex = 2;
1089         final int snapshotTerm = 1;
1090         final int currentTerm = 2;
1091
1092         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1093             @Override
1094             public int getSnapshotChunkSize() {
1095                 return 50;
1096             }
1097         });
1098
1099         actorContext.setCommitIndex(commitIndex);
1100
1101         leader = new Leader(actorContext);
1102
1103         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1104         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1105
1106         Map<String, String> leadersSnapshot = new HashMap<>();
1107         leadersSnapshot.put("1", "A");
1108         leadersSnapshot.put("2", "B");
1109         leadersSnapshot.put("3", "C");
1110
1111         // set the snapshot variables in replicatedlog
1112         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1113         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1114         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1115
1116         ByteString bs = toByteString(leadersSnapshot);
1117         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1118                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1119         leader.setSnapshot(snapshot);
1120
1121         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1122
1123         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1124
1125         assertEquals(1, installSnapshot.getChunkIndex());
1126         assertEquals(3, installSnapshot.getTotalChunks());
1127         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1128
1129         int hashCode = Arrays.hashCode(installSnapshot.getData());
1130
1131         followerActor.underlyingActor().clear();
1132
1133         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1134                 FOLLOWER_ID, 1, true));
1135
1136         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1137
1138         assertEquals(2, installSnapshot.getChunkIndex());
1139         assertEquals(3, installSnapshot.getTotalChunks());
1140         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1141     }
1142
1143     @Test
1144     public void testFollowerToSnapshotLogic() {
1145         logStart("testFollowerToSnapshotLogic");
1146
1147         MockRaftActorContext actorContext = createActorContext();
1148
1149         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1150             @Override
1151             public int getSnapshotChunkSize() {
1152                 return 50;
1153             }
1154         });
1155
1156         leader = new Leader(actorContext);
1157
1158         Map<String, String> leadersSnapshot = new HashMap<>();
1159         leadersSnapshot.put("1", "A");
1160         leadersSnapshot.put("2", "B");
1161         leadersSnapshot.put("3", "C");
1162
1163         ByteString bs = toByteString(leadersSnapshot);
1164         byte[] barray = bs.toByteArray();
1165
1166         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1167         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1168
1169         assertEquals(bs.size(), barray.length);
1170
1171         int chunkIndex=0;
1172         for (int i=0; i < barray.length; i = i + 50) {
1173             int j = i + 50;
1174             chunkIndex++;
1175
1176             if (i + 50 > barray.length) {
1177                 j = barray.length;
1178             }
1179
1180             byte[] chunk = fts.getNextChunk();
1181             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1182             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1183
1184             fts.markSendStatus(true);
1185             if (!fts.isLastChunk(chunkIndex)) {
1186                 fts.incrementChunkIndex();
1187             }
1188         }
1189
1190         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1191     }
1192
1193     @Override protected RaftActorBehavior createBehavior(
1194         RaftActorContext actorContext) {
1195         return new Leader(actorContext);
1196     }
1197
1198     @Override
1199     protected MockRaftActorContext createActorContext() {
1200         return createActorContext(leaderActor);
1201     }
1202
1203     @Override
1204     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1205         return createActorContext(LEADER_ID, actorRef);
1206     }
1207
1208     private MockRaftActorContext createActorContextWithFollower() {
1209         MockRaftActorContext actorContext = createActorContext();
1210         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1211                 followerActor.path().toString()).build());
1212         return actorContext;
1213     }
1214
1215     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1216         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1217         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1218         configParams.setElectionTimeoutFactor(100000);
1219         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1220         context.setConfigParams(configParams);
1221         context.setPayloadVersion(payloadVersion);
1222         return context;
1223     }
1224
1225     private MockRaftActorContext createFollowerActorContextWithLeader() {
1226         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1227         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1228         followerConfig.setElectionTimeoutFactor(10000);
1229         followerActorContext.setConfigParams(followerConfig);
1230         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1231         return followerActorContext;
1232     }
1233
1234     @Test
1235     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1236         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1237
1238         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1239
1240         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1241
1242         Follower follower = new Follower(followerActorContext);
1243         followerActor.underlyingActor().setBehavior(follower);
1244
1245         Map<String, String> peerAddresses = new HashMap<>();
1246         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1247
1248         leaderActorContext.setPeerAddresses(peerAddresses);
1249
1250         leaderActorContext.getReplicatedLog().removeFrom(0);
1251
1252         //create 3 entries
1253         leaderActorContext.setReplicatedLog(
1254                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1255
1256         leaderActorContext.setCommitIndex(1);
1257
1258         followerActorContext.getReplicatedLog().removeFrom(0);
1259
1260         // follower too has the exact same log entries and has the same commit index
1261         followerActorContext.setReplicatedLog(
1262                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1263
1264         followerActorContext.setCommitIndex(1);
1265
1266         leader = new Leader(leaderActorContext);
1267
1268         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1269
1270         assertEquals(1, appendEntries.getLeaderCommit());
1271         assertEquals(0, appendEntries.getEntries().size());
1272         assertEquals(0, appendEntries.getPrevLogIndex());
1273
1274         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1275                 leaderActor, AppendEntriesReply.class);
1276
1277         assertEquals(2, appendEntriesReply.getLogLastIndex());
1278         assertEquals(1, appendEntriesReply.getLogLastTerm());
1279
1280         // follower returns its next index
1281         assertEquals(2, appendEntriesReply.getLogLastIndex());
1282         assertEquals(1, appendEntriesReply.getLogLastTerm());
1283
1284         follower.close();
1285     }
1286
1287     @Test
1288     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1289         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1290
1291         MockRaftActorContext leaderActorContext = createActorContext();
1292
1293         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1294         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1295
1296         Follower follower = new Follower(followerActorContext);
1297         followerActor.underlyingActor().setBehavior(follower);
1298
1299         Map<String, String> leaderPeerAddresses = new HashMap<>();
1300         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1301
1302         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1303
1304         leaderActorContext.getReplicatedLog().removeFrom(0);
1305
1306         leaderActorContext.setReplicatedLog(
1307                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1308
1309         leaderActorContext.setCommitIndex(1);
1310
1311         followerActorContext.getReplicatedLog().removeFrom(0);
1312
1313         followerActorContext.setReplicatedLog(
1314                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315
1316         // follower has the same log entries but its commit index > leaders commit index
1317         followerActorContext.setCommitIndex(2);
1318
1319         leader = new Leader(leaderActorContext);
1320
1321         // Initial heartbeat
1322         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1323
1324         assertEquals(1, appendEntries.getLeaderCommit());
1325         assertEquals(0, appendEntries.getEntries().size());
1326         assertEquals(0, appendEntries.getPrevLogIndex());
1327
1328         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1329                 leaderActor, AppendEntriesReply.class);
1330
1331         assertEquals(2, appendEntriesReply.getLogLastIndex());
1332         assertEquals(1, appendEntriesReply.getLogLastTerm());
1333
1334         leaderActor.underlyingActor().setBehavior(follower);
1335         leader.handleMessage(followerActor, appendEntriesReply);
1336
1337         leaderActor.underlyingActor().clear();
1338         followerActor.underlyingActor().clear();
1339
1340         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1341                 TimeUnit.MILLISECONDS);
1342
1343         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1344
1345         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1346
1347         assertEquals(2, appendEntries.getLeaderCommit());
1348         assertEquals(0, appendEntries.getEntries().size());
1349         assertEquals(2, appendEntries.getPrevLogIndex());
1350
1351         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1352
1353         assertEquals(2, appendEntriesReply.getLogLastIndex());
1354         assertEquals(1, appendEntriesReply.getLogLastTerm());
1355
1356         assertEquals(2, followerActorContext.getCommitIndex());
1357
1358         follower.close();
1359     }
1360
1361     @Test
1362     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1363         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1364
1365         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1366         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1367                 new FiniteDuration(1000, TimeUnit.SECONDS));
1368
1369         leaderActorContext.setReplicatedLog(
1370                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1371         long leaderCommitIndex = 2;
1372         leaderActorContext.setCommitIndex(leaderCommitIndex);
1373         leaderActorContext.setLastApplied(leaderCommitIndex);
1374
1375         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1376         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1377
1378         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1379
1380         followerActorContext.setReplicatedLog(
1381                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1382         followerActorContext.setCommitIndex(0);
1383         followerActorContext.setLastApplied(0);
1384
1385         Follower follower = new Follower(followerActorContext);
1386         followerActor.underlyingActor().setBehavior(follower);
1387
1388         leader = new Leader(leaderActorContext);
1389
1390         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1391         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1392
1393         MessageCollectorActor.clearMessages(followerActor);
1394         MessageCollectorActor.clearMessages(leaderActor);
1395
1396         // Verify initial AppendEntries sent with the leader's current commit index.
1397         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1398         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1399         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1400
1401         leaderActor.underlyingActor().setBehavior(leader);
1402
1403         leader.handleMessage(followerActor, appendEntriesReply);
1404
1405         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1406         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1407
1408         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1409         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1410         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1411
1412         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1413         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1414                 appendEntries.getEntries().get(0).getData());
1415         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1416         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1417                 appendEntries.getEntries().get(1).getData());
1418
1419         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1420         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1421
1422         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1423
1424         ApplyState applyState = applyStateList.get(0);
1425         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1426         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1427         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1428                 applyState.getReplicatedLogEntry().getData());
1429
1430         applyState = applyStateList.get(1);
1431         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1432         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1433         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1434                 applyState.getReplicatedLogEntry().getData());
1435
1436         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1437         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1438     }
1439
1440     @Test
1441     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1442         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1443
1444         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1445         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1446                 new FiniteDuration(1000, TimeUnit.SECONDS));
1447
1448         leaderActorContext.setReplicatedLog(
1449                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1450         long leaderCommitIndex = 1;
1451         leaderActorContext.setCommitIndex(leaderCommitIndex);
1452         leaderActorContext.setLastApplied(leaderCommitIndex);
1453
1454         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1455         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1456
1457         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1458
1459         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1460         followerActorContext.setCommitIndex(-1);
1461         followerActorContext.setLastApplied(-1);
1462
1463         Follower follower = new Follower(followerActorContext);
1464         followerActor.underlyingActor().setBehavior(follower);
1465
1466         leader = new Leader(leaderActorContext);
1467
1468         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1469         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1470
1471         MessageCollectorActor.clearMessages(followerActor);
1472         MessageCollectorActor.clearMessages(leaderActor);
1473
1474         // Verify initial AppendEntries sent with the leader's current commit index.
1475         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1476         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1477         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1478
1479         leaderActor.underlyingActor().setBehavior(leader);
1480
1481         leader.handleMessage(followerActor, appendEntriesReply);
1482
1483         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1484         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1485
1486         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1487         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1488         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1489
1490         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1491         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1492                 appendEntries.getEntries().get(0).getData());
1493         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1494         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1495                 appendEntries.getEntries().get(1).getData());
1496
1497         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1498         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1499
1500         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1501
1502         ApplyState applyState = applyStateList.get(0);
1503         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1504         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1505         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1506                 applyState.getReplicatedLogEntry().getData());
1507
1508         applyState = applyStateList.get(1);
1509         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1510         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1511         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1512                 applyState.getReplicatedLogEntry().getData());
1513
1514         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1515         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1516     }
1517
1518     @Test
1519     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1520         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1521
1522         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1523         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1524                 new FiniteDuration(1000, TimeUnit.SECONDS));
1525
1526         leaderActorContext.setReplicatedLog(
1527                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1528         long leaderCommitIndex = 1;
1529         leaderActorContext.setCommitIndex(leaderCommitIndex);
1530         leaderActorContext.setLastApplied(leaderCommitIndex);
1531
1532         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1533         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1534
1535         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1536
1537         followerActorContext.setReplicatedLog(
1538                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1539         followerActorContext.setCommitIndex(-1);
1540         followerActorContext.setLastApplied(-1);
1541
1542         Follower follower = new Follower(followerActorContext);
1543         followerActor.underlyingActor().setBehavior(follower);
1544
1545         leader = new Leader(leaderActorContext);
1546
1547         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1548         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1549
1550         MessageCollectorActor.clearMessages(followerActor);
1551         MessageCollectorActor.clearMessages(leaderActor);
1552
1553         // Verify initial AppendEntries sent with the leader's current commit index.
1554         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1555         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1556         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1557
1558         leaderActor.underlyingActor().setBehavior(leader);
1559
1560         leader.handleMessage(followerActor, appendEntriesReply);
1561
1562         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1563         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1564
1565         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1566         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1567         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1568
1569         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1570         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1571         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1572                 appendEntries.getEntries().get(0).getData());
1573         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1574         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1575         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1576                 appendEntries.getEntries().get(1).getData());
1577
1578         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1579         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1580
1581         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1582
1583         ApplyState applyState = applyStateList.get(0);
1584         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1585         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1586         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1587                 applyState.getReplicatedLogEntry().getData());
1588
1589         applyState = applyStateList.get(1);
1590         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1591         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1592         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1593                 applyState.getReplicatedLogEntry().getData());
1594
1595         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1596         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1597         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1598     }
1599
1600     @Test
1601     public void testHandleAppendEntriesReplyWithNewerTerm(){
1602         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1603
1604         MockRaftActorContext leaderActorContext = createActorContext();
1605         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1606                 new FiniteDuration(10000, TimeUnit.SECONDS));
1607
1608         leaderActorContext.setReplicatedLog(
1609                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1610
1611         leader = new Leader(leaderActorContext);
1612         leaderActor.underlyingActor().setBehavior(leader);
1613         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1614
1615         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1616
1617         assertEquals(false, appendEntriesReply.isSuccess());
1618         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1619
1620         MessageCollectorActor.clearMessages(leaderActor);
1621     }
1622
1623     @Test
1624     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1625         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1626
1627         MockRaftActorContext leaderActorContext = createActorContext();
1628         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1629                 new FiniteDuration(10000, TimeUnit.SECONDS));
1630
1631         leaderActorContext.setReplicatedLog(
1632                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1633         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1634
1635         leader = new Leader(leaderActorContext);
1636         leaderActor.underlyingActor().setBehavior(leader);
1637         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1638
1639         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1640
1641         assertEquals(false, appendEntriesReply.isSuccess());
1642         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1643
1644         MessageCollectorActor.clearMessages(leaderActor);
1645     }
1646
1647     @Test
1648     public void testHandleAppendEntriesReplySuccess() throws Exception {
1649         logStart("testHandleAppendEntriesReplySuccess");
1650
1651         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1652
1653         leaderActorContext.setReplicatedLog(
1654                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1655
1656         leaderActorContext.setCommitIndex(1);
1657         leaderActorContext.setLastApplied(1);
1658         leaderActorContext.getTermInformation().update(1, "leader");
1659
1660         leader = new Leader(leaderActorContext);
1661
1662         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1663
1664         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1665         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1666
1667         short payloadVersion = 5;
1668         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1669
1670         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1671
1672         assertEquals(RaftState.Leader, raftActorBehavior.state());
1673
1674         assertEquals(2, leaderActorContext.getCommitIndex());
1675
1676         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1677                 leaderActor, ApplyJournalEntries.class);
1678
1679         assertEquals(2, leaderActorContext.getLastApplied());
1680
1681         assertEquals(2, applyJournalEntries.getToIndex());
1682
1683         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1684                 ApplyState.class);
1685
1686         assertEquals(1,applyStateList.size());
1687
1688         ApplyState applyState = applyStateList.get(0);
1689
1690         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1691
1692         assertEquals(2, followerInfo.getMatchIndex());
1693         assertEquals(3, followerInfo.getNextIndex());
1694         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1695         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1696     }
1697
1698     @Test
1699     public void testHandleAppendEntriesReplyUnknownFollower(){
1700         logStart("testHandleAppendEntriesReplyUnknownFollower");
1701
1702         MockRaftActorContext leaderActorContext = createActorContext();
1703
1704         leader = new Leader(leaderActorContext);
1705
1706         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1707
1708         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1709
1710         assertEquals(RaftState.Leader, raftActorBehavior.state());
1711     }
1712
1713     @Test
1714     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1715         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1716
1717         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1718         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1719                 new FiniteDuration(1000, TimeUnit.SECONDS));
1720         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1721
1722         leaderActorContext.setReplicatedLog(
1723                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1724         long leaderCommitIndex = 3;
1725         leaderActorContext.setCommitIndex(leaderCommitIndex);
1726         leaderActorContext.setLastApplied(leaderCommitIndex);
1727
1728         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1729         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1730         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1731         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1732
1733         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1734
1735         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1736         followerActorContext.setCommitIndex(-1);
1737         followerActorContext.setLastApplied(-1);
1738
1739         Follower follower = new Follower(followerActorContext);
1740         followerActor.underlyingActor().setBehavior(follower);
1741
1742         leader = new Leader(leaderActorContext);
1743
1744         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1745         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1746
1747         MessageCollectorActor.clearMessages(followerActor);
1748         MessageCollectorActor.clearMessages(leaderActor);
1749
1750         // Verify initial AppendEntries sent with the leader's current commit index.
1751         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1752         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1753         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1754
1755         leaderActor.underlyingActor().setBehavior(leader);
1756
1757         leader.handleMessage(followerActor, appendEntriesReply);
1758
1759         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1760         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1761
1762         appendEntries = appendEntriesList.get(0);
1763         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1764         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1765         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1766
1767         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1768         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1769                 appendEntries.getEntries().get(0).getData());
1770         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1771         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1772                 appendEntries.getEntries().get(1).getData());
1773
1774         appendEntries = appendEntriesList.get(1);
1775         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1776         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1777         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1778
1779         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1780         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1781                 appendEntries.getEntries().get(0).getData());
1782         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1783         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1784                 appendEntries.getEntries().get(1).getData());
1785
1786         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1787         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1788
1789         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1790
1791         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1792         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1793     }
1794
1795     @Test
1796     public void testHandleRequestVoteReply(){
1797         logStart("testHandleRequestVoteReply");
1798
1799         MockRaftActorContext leaderActorContext = createActorContext();
1800
1801         leader = new Leader(leaderActorContext);
1802
1803         // Should be a no-op.
1804         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1805                 new RequestVoteReply(1, true));
1806
1807         assertEquals(RaftState.Leader, raftActorBehavior.state());
1808
1809         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1810
1811         assertEquals(RaftState.Leader, raftActorBehavior.state());
1812     }
1813
1814     @Test
1815     public void testIsolatedLeaderCheckNoFollowers() {
1816         logStart("testIsolatedLeaderCheckNoFollowers");
1817
1818         MockRaftActorContext leaderActorContext = createActorContext();
1819
1820         leader = new Leader(leaderActorContext);
1821         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1822         Assert.assertTrue(behavior instanceof Leader);
1823     }
1824
1825     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1826         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1827         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1828
1829         MockRaftActorContext leaderActorContext = createActorContext();
1830
1831         Map<String, String> peerAddresses = new HashMap<>();
1832         peerAddresses.put("follower-1", followerActor1.path().toString());
1833         peerAddresses.put("follower-2", followerActor2.path().toString());
1834
1835         leaderActorContext.setPeerAddresses(peerAddresses);
1836         leaderActorContext.setRaftPolicy(raftPolicy);
1837
1838         leader = new Leader(leaderActorContext);
1839
1840         leader.markFollowerActive("follower-1");
1841         leader.markFollowerActive("follower-2");
1842         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1843         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1844                 behavior instanceof Leader);
1845
1846         // kill 1 follower and verify if that got killed
1847         final JavaTestKit probe = new JavaTestKit(getSystem());
1848         probe.watch(followerActor1);
1849         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1850         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1851         assertEquals(termMsg1.getActor(), followerActor1);
1852
1853         leader.markFollowerInActive("follower-1");
1854         leader.markFollowerActive("follower-2");
1855         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1856         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1857                 behavior instanceof Leader);
1858
1859         // kill 2nd follower and leader should change to Isolated leader
1860         followerActor2.tell(PoisonPill.getInstance(), null);
1861         probe.watch(followerActor2);
1862         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1863         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1864         assertEquals(termMsg2.getActor(), followerActor2);
1865
1866         leader.markFollowerInActive("follower-2");
1867         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1868     }
1869
1870     @Test
1871     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1872         logStart("testIsolatedLeaderCheckTwoFollowers");
1873
1874         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1875
1876         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1877             behavior instanceof IsolatedLeader);
1878     }
1879
1880     @Test
1881     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1882         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1883
1884         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1885
1886         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1887                 behavior instanceof Leader);
1888     }
1889
1890     @Test
1891     public void testLaggingFollowerStarvation() throws Exception {
1892         logStart("testLaggingFollowerStarvation");
1893         new JavaTestKit(getSystem()) {{
1894             String leaderActorId = actorFactory.generateActorId("leader");
1895             String follower1ActorId = actorFactory.generateActorId("follower");
1896             String follower2ActorId = actorFactory.generateActorId("follower");
1897
1898             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1899                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1900             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1901             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1902
1903             MockRaftActorContext leaderActorContext =
1904                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1905
1906             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1907             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1908             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1909
1910             leaderActorContext.setConfigParams(configParams);
1911
1912             leaderActorContext.setReplicatedLog(
1913                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1914
1915             Map<String, String> peerAddresses = new HashMap<>();
1916             peerAddresses.put(follower1ActorId,
1917                     follower1Actor.path().toString());
1918             peerAddresses.put(follower2ActorId,
1919                     follower2Actor.path().toString());
1920
1921             leaderActorContext.setPeerAddresses(peerAddresses);
1922             leaderActorContext.getTermInformation().update(1, leaderActorId);
1923
1924             RaftActorBehavior leader = createBehavior(leaderActorContext);
1925
1926             leaderActor.underlyingActor().setBehavior(leader);
1927
1928             for(int i=1;i<6;i++) {
1929                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1930                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1931                 assertTrue(newBehavior == leader);
1932                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1933             }
1934
1935             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1936             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1937
1938             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1939                     heartbeats.size() > 1);
1940
1941             // Check if follower-2 got AppendEntries during this time and was not starved
1942             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1943
1944             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1945                     appendEntries.size() > 1);
1946
1947         }};
1948     }
1949
1950     @Test
1951     public void testReplicationConsensusWithNonVotingFollower() {
1952         logStart("testReplicationConsensusWithNonVotingFollower");
1953
1954         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1955         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1956                 new FiniteDuration(1000, TimeUnit.SECONDS));
1957
1958         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1959
1960         String nonVotingFollowerId = "nonvoting-follower";
1961         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1962                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1963
1964         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1965
1966         leader = new Leader(leaderActorContext);
1967
1968         // Ignore initial heartbeats
1969         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1970         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1971
1972         MessageCollectorActor.clearMessages(followerActor);
1973         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1974         MessageCollectorActor.clearMessages(leaderActor);
1975
1976         // Send a Replicate message and wait for AppendEntries.
1977         sendReplicate(leaderActorContext, 0);
1978
1979         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1980         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1981
1982         // Send reply only from the voting follower and verify consensus via ApplyState.
1983         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1984
1985         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1986
1987         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1988
1989         MessageCollectorActor.clearMessages(followerActor);
1990         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1991         MessageCollectorActor.clearMessages(leaderActor);
1992
1993         // Send another Replicate message
1994         sendReplicate(leaderActorContext, 1);
1995
1996         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1997         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1998                 AppendEntries.class);
1999         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2000         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2001
2002         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2003         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2004
2005         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2006
2007         // Send reply from the voting follower and verify consensus.
2008         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2009
2010         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2011     }
2012
2013     @Test
2014     public void testTransferLeadershipWithFollowerInSync() {
2015         logStart("testTransferLeadershipWithFollowerInSync");
2016
2017         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2018         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2019                 new FiniteDuration(1000, TimeUnit.SECONDS));
2020         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2021
2022         leader = new Leader(leaderActorContext);
2023
2024         // Initial heartbeat
2025         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2026         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2027         MessageCollectorActor.clearMessages(followerActor);
2028
2029         sendReplicate(leaderActorContext, 0);
2030         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2031
2032         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2033         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2034         MessageCollectorActor.clearMessages(followerActor);
2035
2036         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2037         leader.transferLeadership(mockTransferCohort);
2038
2039         verify(mockTransferCohort, never()).transferComplete();
2040         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2041         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2042
2043         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2044         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2045
2046         // Leader should force an election timeout
2047         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2048
2049         verify(mockTransferCohort).transferComplete();
2050     }
2051
2052     @Test
2053     public void testTransferLeadershipWithEmptyLog() {
2054         logStart("testTransferLeadershipWithEmptyLog");
2055
2056         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2057         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2058                 new FiniteDuration(1000, TimeUnit.SECONDS));
2059         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2060
2061         leader = new Leader(leaderActorContext);
2062
2063         // Initial heartbeat
2064         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2065         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2066         MessageCollectorActor.clearMessages(followerActor);
2067
2068         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2069         leader.transferLeadership(mockTransferCohort);
2070
2071         verify(mockTransferCohort, never()).transferComplete();
2072         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2074
2075         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2076         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2077
2078         // Leader should force an election timeout
2079         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2080
2081         verify(mockTransferCohort).transferComplete();
2082     }
2083
2084     @Test
2085     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2086         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2087
2088         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2089         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2090                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2091
2092         leader = new Leader(leaderActorContext);
2093
2094         // Initial heartbeat
2095         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2096         MessageCollectorActor.clearMessages(followerActor);
2097
2098         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2099         leader.transferLeadership(mockTransferCohort);
2100
2101         verify(mockTransferCohort, never()).transferComplete();
2102
2103         // Sync up the follower.
2104         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2105         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2106         MessageCollectorActor.clearMessages(followerActor);
2107
2108         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2109                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2110         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2111         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2112         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2113
2114         // Leader should force an election timeout
2115         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2116
2117         verify(mockTransferCohort).transferComplete();
2118     }
2119
2120     @Test
2121     public void testTransferLeadershipWithFollowerSyncTimeout() {
2122         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2123
2124         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2125         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2126                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2127         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2128         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2129
2130         leader = new Leader(leaderActorContext);
2131
2132         // Initial heartbeat
2133         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2134         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2135         MessageCollectorActor.clearMessages(followerActor);
2136
2137         sendReplicate(leaderActorContext, 0);
2138         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2139
2140         MessageCollectorActor.clearMessages(followerActor);
2141
2142         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2143         leader.transferLeadership(mockTransferCohort);
2144
2145         verify(mockTransferCohort, never()).transferComplete();
2146
2147         // Send heartbeats to time out the transfer.
2148         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2149             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2150                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2151             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2152         }
2153
2154         verify(mockTransferCohort).abortTransfer();
2155         verify(mockTransferCohort, never()).transferComplete();
2156         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2157     }
2158
2159     @Override
2160     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2161             ActorRef actorRef, RaftRPC rpc) throws Exception {
2162         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2163         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2164     }
2165
2166     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2167
2168         private final long electionTimeOutIntervalMillis;
2169         private final int snapshotChunkSize;
2170
2171         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2172             super();
2173             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2174             this.snapshotChunkSize = snapshotChunkSize;
2175         }
2176
2177         @Override
2178         public FiniteDuration getElectionTimeOutInterval() {
2179             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2180         }
2181
2182         @Override
2183         public int getSnapshotChunkSize() {
2184             return snapshotChunkSize;
2185         }
2186     }
2187 }