BUG-5626: do not allow overriding of RaftActor.handleCommand()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.PoisonPill;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.testkit.JavaTestKit;
24 import akka.testkit.TestActorRef;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
41 import org.opendaylight.controller.cluster.raft.RaftState;
42 import org.opendaylight.controller.cluster.raft.RaftVersions;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
45 import org.opendaylight.controller.cluster.raft.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
52 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
54 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
55 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
60 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
61 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
62 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
63 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
64 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class LeaderTest extends AbstractLeaderTest<Leader> {
69
70     static final String FOLLOWER_ID = "follower";
71     public static final String LEADER_ID = "leader";
72
73     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
74             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
75
76     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
77             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
78
79     private Leader leader;
80     private final short payloadVersion = 5;
81
82     @Override
83     @After
84     public void tearDown() throws Exception {
85         if(leader != null) {
86             leader.close();
87         }
88
89         super.tearDown();
90     }
91
92     @Test
93     public void testHandleMessageForUnknownMessage() throws Exception {
94         logStart("testHandleMessageForUnknownMessage");
95
96         leader = new Leader(createActorContext());
97
98         // handle message should null when it receives an unknown message
99         assertNull(leader.handleMessage(followerActor, "foo"));
100     }
101
102     @Test
103     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
105
106         MockRaftActorContext actorContext = createActorContextWithFollower();
107         short payloadVersion = (short)5;
108         actorContext.setPayloadVersion(payloadVersion);
109
110         long term = 1;
111         actorContext.getTermInformation().update(term, "");
112
113         leader = new Leader(actorContext);
114         actorContext.setCurrentBehavior(leader);
115
116         // Leader should send an immediate heartbeat with no entries as follower is inactive.
117         long lastIndex = actorContext.getReplicatedLog().lastIndex();
118         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119         assertEquals("getTerm", term, appendEntries.getTerm());
120         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
121         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
122         assertEquals("Entries size", 0, appendEntries.getEntries().size());
123         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
124
125         // The follower would normally reply - simulate that explicitly here.
126         leader.handleMessage(followerActor, new AppendEntriesReply(
127                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
128         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
129
130         followerActor.underlyingActor().clear();
131
132         // Sleep for the heartbeat interval so AppendEntries is sent.
133         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
134                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
135
136         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
137
138         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
139         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
140         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
141         assertEquals("Entries size", 1, appendEntries.getEntries().size());
142         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
143         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
144         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
145     }
146
147
148     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
149         return sendReplicate(actorContext, 1, index);
150     }
151
152     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
153         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
154         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
155                 term, index, payload);
156         actorContext.getReplicatedLog().append(newEntry);
157         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
158     }
159
160     @Test
161     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
162         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
163
164         MockRaftActorContext actorContext = createActorContextWithFollower();
165
166         long term = 1;
167         actorContext.getTermInformation().update(term, "");
168
169         leader = new Leader(actorContext);
170
171         // Leader will send an immediate heartbeat - ignore it.
172         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
173
174         // The follower would normally reply - simulate that explicitly here.
175         long lastIndex = actorContext.getReplicatedLog().lastIndex();
176         leader.handleMessage(followerActor, new AppendEntriesReply(
177                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
178         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
179
180         followerActor.underlyingActor().clear();
181
182         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
183
184         // State should not change
185         assertTrue(raftBehavior instanceof Leader);
186
187         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
188         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
189         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
190         assertEquals("Entries size", 1, appendEntries.getEntries().size());
191         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
192         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
193         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
194         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
195     }
196
197     @Test
198     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
199         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
200
201         MockRaftActorContext actorContext = createActorContextWithFollower();
202
203         // The raft context is initialized with a couple log entries. However the commitIndex
204         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
205         // committed and applied. Now it regains leadership with a higher term (2).
206         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
207         long newTerm = prevTerm + 1;
208         actorContext.getTermInformation().update(newTerm, "");
209
210         leader = new Leader(actorContext);
211         actorContext.setCurrentBehavior(leader);
212
213         // Leader will send an immediate heartbeat - ignore it.
214         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215
216         // The follower replies with the leader's current last index and term, simulating that it is
217         // up to date with the leader.
218         long lastIndex = actorContext.getReplicatedLog().lastIndex();
219         leader.handleMessage(followerActor, new AppendEntriesReply(
220                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
221
222         // The commit index should not get updated even though consensus was reached. This is b/c the
223         // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
224         // from previous terms by counting replicas".
225         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
226
227         followerActor.underlyingActor().clear();
228
229         // Now replicate a new entry with the new term 2.
230         long newIndex = lastIndex + 1;
231         sendReplicate(actorContext, newTerm, newIndex);
232
233         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
234         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
235         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
236         assertEquals("Entries size", 1, appendEntries.getEntries().size());
237         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
238         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
239         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
240
241         // The follower replies with success. The leader should now update the commit index to the new index
242         // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
243         // prior entries are committed indirectly".
244         leader.handleMessage(followerActor, new AppendEntriesReply(
245                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
246
247         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
248     }
249
250     @Test
251     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
252         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
253
254         MockRaftActorContext actorContext = createActorContextWithFollower();
255         actorContext.setRaftPolicy(createRaftPolicy(true, true));
256
257         long term = 1;
258         actorContext.getTermInformation().update(term, "");
259
260         leader = new Leader(actorContext);
261
262         // Leader will send an immediate heartbeat - ignore it.
263         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
264
265         // The follower would normally reply - simulate that explicitly here.
266         long lastIndex = actorContext.getReplicatedLog().lastIndex();
267         leader.handleMessage(followerActor, new AppendEntriesReply(
268                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
269         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
270
271         followerActor.underlyingActor().clear();
272
273         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
274
275         // State should not change
276         assertTrue(raftBehavior instanceof Leader);
277
278         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
279         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
280         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
281         assertEquals("Entries size", 1, appendEntries.getEntries().size());
282         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
283         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
284         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
285         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
286     }
287
288     @Test
289     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
290         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
291
292         MockRaftActorContext actorContext = createActorContextWithFollower();
293         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
294             @Override
295             public FiniteDuration getHeartBeatInterval() {
296                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
297             }
298         });
299
300         long term = 1;
301         actorContext.getTermInformation().update(term, "");
302
303         leader = new Leader(actorContext);
304
305         // Leader will send an immediate heartbeat - ignore it.
306         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
307
308         // The follower would normally reply - simulate that explicitly here.
309         long lastIndex = actorContext.getReplicatedLog().lastIndex();
310         leader.handleMessage(followerActor, new AppendEntriesReply(
311                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
312         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
313
314         followerActor.underlyingActor().clear();
315
316         for(int i=0;i<5;i++) {
317             sendReplicate(actorContext, lastIndex+i+1);
318         }
319
320         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
321         // We expect only 1 message to be sent because of two reasons,
322         // - an append entries reply was not received
323         // - the heartbeat interval has not expired
324         // In this scenario if multiple messages are sent they would likely be duplicates
325         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
326     }
327
328     @Test
329     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
330         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
331
332         MockRaftActorContext actorContext = createActorContextWithFollower();
333         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
334             @Override
335             public FiniteDuration getHeartBeatInterval() {
336                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
337             }
338         });
339
340         long term = 1;
341         actorContext.getTermInformation().update(term, "");
342
343         leader = new Leader(actorContext);
344
345         // Leader will send an immediate heartbeat - ignore it.
346         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
347
348         // The follower would normally reply - simulate that explicitly here.
349         long lastIndex = actorContext.getReplicatedLog().lastIndex();
350         leader.handleMessage(followerActor, new AppendEntriesReply(
351                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
352         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
353
354         followerActor.underlyingActor().clear();
355
356         for(int i=0;i<3;i++) {
357             sendReplicate(actorContext, lastIndex+i+1);
358             leader.handleMessage(followerActor, new AppendEntriesReply(
359                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
360
361         }
362
363         for(int i=3;i<5;i++) {
364             sendReplicate(actorContext, lastIndex + i + 1);
365         }
366
367         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
368         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
369         // get sent to the follower - but not the 5th
370         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
371
372         for(int i=0;i<4;i++) {
373             long expected = allMessages.get(i).getEntries().get(0).getIndex();
374             assertEquals(expected, i+2);
375         }
376     }
377
378     @Test
379     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
380         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
381
382         MockRaftActorContext actorContext = createActorContextWithFollower();
383         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
384             @Override
385             public FiniteDuration getHeartBeatInterval() {
386                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
387             }
388         });
389
390         long term = 1;
391         actorContext.getTermInformation().update(term, "");
392
393         leader = new Leader(actorContext);
394
395         // Leader will send an immediate heartbeat - ignore it.
396         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
397
398         // The follower would normally reply - simulate that explicitly here.
399         long lastIndex = actorContext.getReplicatedLog().lastIndex();
400         leader.handleMessage(followerActor, new AppendEntriesReply(
401                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
402         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
403
404         followerActor.underlyingActor().clear();
405
406         sendReplicate(actorContext, lastIndex+1);
407
408         // Wait slightly longer than heartbeat duration
409         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
410
411         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
412
413         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
414         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
415
416         assertEquals(1, allMessages.get(0).getEntries().size());
417         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
418         assertEquals(1, allMessages.get(1).getEntries().size());
419         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
420
421     }
422
423     @Test
424     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
425         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
426
427         MockRaftActorContext actorContext = createActorContextWithFollower();
428         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
429             @Override
430             public FiniteDuration getHeartBeatInterval() {
431                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
432             }
433         });
434
435         long term = 1;
436         actorContext.getTermInformation().update(term, "");
437
438         leader = new Leader(actorContext);
439
440         // Leader will send an immediate heartbeat - ignore it.
441         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
442
443         // The follower would normally reply - simulate that explicitly here.
444         long lastIndex = actorContext.getReplicatedLog().lastIndex();
445         leader.handleMessage(followerActor, new AppendEntriesReply(
446                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
447         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
448
449         followerActor.underlyingActor().clear();
450
451         for(int i=0;i<3;i++) {
452             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
453             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
454         }
455
456         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
457         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
458     }
459
460     @Test
461     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
462         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
463
464         MockRaftActorContext actorContext = createActorContextWithFollower();
465         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
466             @Override
467             public FiniteDuration getHeartBeatInterval() {
468                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
469             }
470         });
471
472         long term = 1;
473         actorContext.getTermInformation().update(term, "");
474
475         leader = new Leader(actorContext);
476
477         // Leader will send an immediate heartbeat - ignore it.
478         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
479
480         // The follower would normally reply - simulate that explicitly here.
481         long lastIndex = actorContext.getReplicatedLog().lastIndex();
482         leader.handleMessage(followerActor, new AppendEntriesReply(
483                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
484         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
485
486         followerActor.underlyingActor().clear();
487
488         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
489         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
490         sendReplicate(actorContext, lastIndex+1);
491
492         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
493         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
494
495         assertEquals(0, allMessages.get(0).getEntries().size());
496         assertEquals(1, allMessages.get(1).getEntries().size());
497     }
498
499
500     @Test
501     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
502         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
503
504         MockRaftActorContext actorContext = createActorContext();
505
506         leader = new Leader(actorContext);
507
508         actorContext.setLastApplied(0);
509
510         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
511         long term = actorContext.getTermInformation().getCurrentTerm();
512         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
513                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
514
515         actorContext.getReplicatedLog().append(newEntry);
516
517         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
518                 new Replicate(leaderActor, "state-id", newEntry));
519
520         // State should not change
521         assertTrue(raftBehavior instanceof Leader);
522
523         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
524
525         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
526         // one since lastApplied state is 0.
527         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
528                 leaderActor, ApplyState.class);
529         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
530
531         for(int i = 0; i <= newLogIndex - 1; i++ ) {
532             ApplyState applyState = applyStateList.get(i);
533             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
534             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
535         }
536
537         ApplyState last = applyStateList.get((int) newLogIndex - 1);
538         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
539         assertEquals("getIdentifier", "state-id", last.getIdentifier());
540     }
541
542     @Test
543     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
544         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
545
546         MockRaftActorContext actorContext = createActorContextWithFollower();
547
548         Map<String, String> leadersSnapshot = new HashMap<>();
549         leadersSnapshot.put("1", "A");
550         leadersSnapshot.put("2", "B");
551         leadersSnapshot.put("3", "C");
552
553         //clears leaders log
554         actorContext.getReplicatedLog().removeFrom(0);
555
556         final int commitIndex = 3;
557         final int snapshotIndex = 2;
558         final int newEntryIndex = 4;
559         final int snapshotTerm = 1;
560         final int currentTerm = 2;
561
562         // set the snapshot variables in replicatedlog
563         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
564         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
565         actorContext.setCommitIndex(commitIndex);
566         //set follower timeout to 2 mins, helps during debugging
567         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
568
569         leader = new Leader(actorContext);
570
571         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
572         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
573
574         // new entry
575         ReplicatedLogImplEntry entry =
576                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
577                         new MockRaftActorContext.MockPayload("D"));
578
579         //update follower timestamp
580         leader.markFollowerActive(FOLLOWER_ID);
581
582         ByteString bs = toByteString(leadersSnapshot);
583         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
584                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
585         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
586         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
587
588         //send first chunk and no InstallSnapshotReply received yet
589         fts.getNextChunk();
590         fts.incrementChunkIndex();
591
592         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
593                 TimeUnit.MILLISECONDS);
594
595         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
596
597         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
598
599         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
600
601         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
602
603         //InstallSnapshotReply received
604         fts.markSendStatus(true);
605
606         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
607
608         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
609
610         assertEquals(commitIndex, is.getLastIncludedIndex());
611     }
612
613     @Test
614     public void testSendAppendEntriesSnapshotScenario() throws Exception {
615         logStart("testSendAppendEntriesSnapshotScenario");
616
617         MockRaftActorContext actorContext = createActorContextWithFollower();
618
619         Map<String, String> leadersSnapshot = new HashMap<>();
620         leadersSnapshot.put("1", "A");
621         leadersSnapshot.put("2", "B");
622         leadersSnapshot.put("3", "C");
623
624         //clears leaders log
625         actorContext.getReplicatedLog().removeFrom(0);
626
627         final int followersLastIndex = 2;
628         final int snapshotIndex = 3;
629         final int newEntryIndex = 4;
630         final int snapshotTerm = 1;
631         final int currentTerm = 2;
632
633         // set the snapshot variables in replicatedlog
634         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
635         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
636         actorContext.setCommitIndex(followersLastIndex);
637
638         leader = new Leader(actorContext);
639
640         // Leader will send an immediate heartbeat - ignore it.
641         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
642
643         // new entry
644         ReplicatedLogImplEntry entry =
645                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
646                         new MockRaftActorContext.MockPayload("D"));
647
648         actorContext.getReplicatedLog().append(entry);
649
650         //update follower timestamp
651         leader.markFollowerActive(FOLLOWER_ID);
652
653         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
654         RaftActorBehavior raftBehavior = leader.handleMessage(
655                 leaderActor, new Replicate(null, "state-id", entry));
656
657         assertTrue(raftBehavior instanceof Leader);
658
659         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
660     }
661
662     @Test
663     public void testInitiateInstallSnapshot() throws Exception {
664         logStart("testInitiateInstallSnapshot");
665
666         MockRaftActorContext actorContext = createActorContextWithFollower();
667
668         //clears leaders log
669         actorContext.getReplicatedLog().removeFrom(0);
670
671         final int followersLastIndex = 2;
672         final int snapshotIndex = 3;
673         final int newEntryIndex = 4;
674         final int snapshotTerm = 1;
675         final int currentTerm = 2;
676
677         // set the snapshot variables in replicatedlog
678         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
679         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
680         actorContext.setLastApplied(3);
681         actorContext.setCommitIndex(followersLastIndex);
682
683         leader = new Leader(actorContext);
684
685         // Leader will send an immediate heartbeat - ignore it.
686         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
687
688         // set the snapshot as absent and check if capture-snapshot is invoked.
689         leader.setSnapshot(null);
690
691         // new entry
692         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
693                 new MockRaftActorContext.MockPayload("D"));
694
695         actorContext.getReplicatedLog().append(entry);
696
697         //update follower timestamp
698         leader.markFollowerActive(FOLLOWER_ID);
699
700         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
701
702         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
703
704         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
705
706         assertTrue(cs.isInstallSnapshotInitiated());
707         assertEquals(3, cs.getLastAppliedIndex());
708         assertEquals(1, cs.getLastAppliedTerm());
709         assertEquals(4, cs.getLastIndex());
710         assertEquals(2, cs.getLastTerm());
711
712         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
713         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
714
715         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
716     }
717
718     @Test
719     public void testInitiateForceInstallSnapshot() throws Exception {
720         logStart("testInitiateForceInstallSnapshot");
721
722         MockRaftActorContext actorContext = createActorContextWithFollower();
723
724         final int followersLastIndex = 2;
725         final int snapshotIndex = -1;
726         final int newEntryIndex = 4;
727         final int snapshotTerm = -1;
728         final int currentTerm = 2;
729
730         // set the snapshot variables in replicatedlog
731         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
732         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
733         actorContext.setLastApplied(3);
734         actorContext.setCommitIndex(followersLastIndex);
735
736         actorContext.getReplicatedLog().removeFrom(0);
737
738         leader = new Leader(actorContext);
739
740         // Leader will send an immediate heartbeat - ignore it.
741         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
742
743         // set the snapshot as absent and check if capture-snapshot is invoked.
744         leader.setSnapshot(null);
745
746         for(int i=0;i<4;i++) {
747             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
748                     new MockRaftActorContext.MockPayload("X" + i)));
749         }
750
751         // new entry
752         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
753                 new MockRaftActorContext.MockPayload("D"));
754
755         actorContext.getReplicatedLog().append(entry);
756
757         //update follower timestamp
758         leader.markFollowerActive(FOLLOWER_ID);
759
760         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
761         // installed with a SendInstallSnapshot
762         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
763
764         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
765
766         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
767
768         assertTrue(cs.isInstallSnapshotInitiated());
769         assertEquals(3, cs.getLastAppliedIndex());
770         assertEquals(1, cs.getLastAppliedTerm());
771         assertEquals(4, cs.getLastIndex());
772         assertEquals(2, cs.getLastTerm());
773
774         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
775         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
776
777         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
778     }
779
780
781     @Test
782     public void testInstallSnapshot() throws Exception {
783         logStart("testInstallSnapshot");
784
785         MockRaftActorContext actorContext = createActorContextWithFollower();
786
787         Map<String, String> leadersSnapshot = new HashMap<>();
788         leadersSnapshot.put("1", "A");
789         leadersSnapshot.put("2", "B");
790         leadersSnapshot.put("3", "C");
791
792         //clears leaders log
793         actorContext.getReplicatedLog().removeFrom(0);
794
795         final int lastAppliedIndex = 3;
796         final int snapshotIndex = 2;
797         final int snapshotTerm = 1;
798         final int currentTerm = 2;
799
800         // set the snapshot variables in replicatedlog
801         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
802         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
803         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
804         actorContext.setCommitIndex(lastAppliedIndex);
805         actorContext.setLastApplied(lastAppliedIndex);
806
807         leader = new Leader(actorContext);
808
809         // Initial heartbeat.
810         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
811
812         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
813         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
814
815         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
816                 Collections.<ReplicatedLogEntry>emptyList(),
817                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
818
819         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
820
821         assertTrue(raftBehavior instanceof Leader);
822
823         // check if installsnapshot gets called with the correct values.
824
825         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
826
827         assertNotNull(installSnapshot.getData());
828         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
829         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
830
831         assertEquals(currentTerm, installSnapshot.getTerm());
832     }
833
834     @Test
835     public void testForceInstallSnapshot() throws Exception {
836         logStart("testForceInstallSnapshot");
837
838         MockRaftActorContext actorContext = createActorContextWithFollower();
839
840         Map<String, String> leadersSnapshot = new HashMap<>();
841         leadersSnapshot.put("1", "A");
842         leadersSnapshot.put("2", "B");
843         leadersSnapshot.put("3", "C");
844
845         final int lastAppliedIndex = 3;
846         final int snapshotIndex = -1;
847         final int snapshotTerm = -1;
848         final int currentTerm = 2;
849
850         // set the snapshot variables in replicatedlog
851         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
852         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
853         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
854         actorContext.setCommitIndex(lastAppliedIndex);
855         actorContext.setLastApplied(lastAppliedIndex);
856
857         leader = new Leader(actorContext);
858
859         // Initial heartbeat.
860         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
861
862         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
863         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
864
865         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
866                 Collections.<ReplicatedLogEntry>emptyList(),
867                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
868
869         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
870
871         assertTrue(raftBehavior instanceof Leader);
872
873         // check if installsnapshot gets called with the correct values.
874
875         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
876
877         assertNotNull(installSnapshot.getData());
878         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
879         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
880
881         assertEquals(currentTerm, installSnapshot.getTerm());
882     }
883
884     @Test
885     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
886         logStart("testHandleInstallSnapshotReplyLastChunk");
887
888         MockRaftActorContext actorContext = createActorContextWithFollower();
889
890         final int commitIndex = 3;
891         final int snapshotIndex = 2;
892         final int snapshotTerm = 1;
893         final int currentTerm = 2;
894
895         actorContext.setCommitIndex(commitIndex);
896
897         leader = new Leader(actorContext);
898         actorContext.setCurrentBehavior(leader);
899
900         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
901         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
902
903         // Ignore initial heartbeat.
904         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
905
906         Map<String, String> leadersSnapshot = new HashMap<>();
907         leadersSnapshot.put("1", "A");
908         leadersSnapshot.put("2", "B");
909         leadersSnapshot.put("3", "C");
910
911         // set the snapshot variables in replicatedlog
912
913         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916
917         ByteString bs = toByteString(leadersSnapshot);
918         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
919                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
920         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
921         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
922         while(!fts.isLastChunk(fts.getChunkIndex())) {
923             fts.getNextChunk();
924             fts.incrementChunkIndex();
925         }
926
927         //clears leaders log
928         actorContext.getReplicatedLog().removeFrom(0);
929
930         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
931                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
932
933         assertTrue(raftBehavior instanceof Leader);
934
935         assertEquals(0, leader.followerSnapshotSize());
936         assertEquals(1, leader.followerLogSize());
937         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
938         assertNotNull(fli);
939         assertEquals(commitIndex, fli.getMatchIndex());
940         assertEquals(commitIndex + 1, fli.getNextIndex());
941     }
942
943     @Test
944     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
945         logStart("testSendSnapshotfromInstallSnapshotReply");
946
947         MockRaftActorContext actorContext = createActorContextWithFollower();
948
949         final int commitIndex = 3;
950         final int snapshotIndex = 2;
951         final int snapshotTerm = 1;
952         final int currentTerm = 2;
953
954         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
955             @Override
956             public int getSnapshotChunkSize() {
957                 return 50;
958             }
959         };
960         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
961         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
962
963         actorContext.setConfigParams(configParams);
964         actorContext.setCommitIndex(commitIndex);
965
966         leader = new Leader(actorContext);
967         actorContext.setCurrentBehavior(leader);
968
969         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
970         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
971
972         Map<String, String> leadersSnapshot = new HashMap<>();
973         leadersSnapshot.put("1", "A");
974         leadersSnapshot.put("2", "B");
975         leadersSnapshot.put("3", "C");
976
977         // set the snapshot variables in replicatedlog
978         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
979         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
980         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
981
982         ByteString bs = toByteString(leadersSnapshot);
983         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
984                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
985         leader.setSnapshot(snapshot);
986
987         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
988
989         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
990
991         assertEquals(1, installSnapshot.getChunkIndex());
992         assertEquals(3, installSnapshot.getTotalChunks());
993
994         followerActor.underlyingActor().clear();
995         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
996                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
997
998         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
999
1000         assertEquals(2, installSnapshot.getChunkIndex());
1001         assertEquals(3, installSnapshot.getTotalChunks());
1002
1003         followerActor.underlyingActor().clear();
1004         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1005                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1006
1007         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1008
1009         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1010         followerActor.underlyingActor().clear();
1011         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1012                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1013
1014         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1015
1016         assertNull(installSnapshot);
1017     }
1018
1019
1020     @Test
1021     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
1022         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1023
1024         MockRaftActorContext actorContext = createActorContextWithFollower();
1025
1026         final int commitIndex = 3;
1027         final int snapshotIndex = 2;
1028         final int snapshotTerm = 1;
1029         final int currentTerm = 2;
1030
1031         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
1032             @Override
1033             public int getSnapshotChunkSize() {
1034                 return 50;
1035             }
1036         });
1037
1038         actorContext.setCommitIndex(commitIndex);
1039
1040         leader = new Leader(actorContext);
1041
1042         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1043         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1044
1045         Map<String, String> leadersSnapshot = new HashMap<>();
1046         leadersSnapshot.put("1", "A");
1047         leadersSnapshot.put("2", "B");
1048         leadersSnapshot.put("3", "C");
1049
1050         // set the snapshot variables in replicatedlog
1051         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1052         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1053         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1054
1055         ByteString bs = toByteString(leadersSnapshot);
1056         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1057                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1058         leader.setSnapshot(snapshot);
1059
1060         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1061         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1062
1063         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1064
1065         assertEquals(1, installSnapshot.getChunkIndex());
1066         assertEquals(3, installSnapshot.getTotalChunks());
1067
1068         followerActor.underlyingActor().clear();
1069
1070         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1071                 FOLLOWER_ID, -1, false));
1072
1073         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1074                 TimeUnit.MILLISECONDS);
1075
1076         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1077
1078         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1079
1080         assertEquals(1, installSnapshot.getChunkIndex());
1081         assertEquals(3, installSnapshot.getTotalChunks());
1082     }
1083
1084     @Test
1085     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1086         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1087
1088         MockRaftActorContext actorContext = createActorContextWithFollower();
1089
1090         final int commitIndex = 3;
1091         final int snapshotIndex = 2;
1092         final int snapshotTerm = 1;
1093         final int currentTerm = 2;
1094
1095         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1096             @Override
1097             public int getSnapshotChunkSize() {
1098                 return 50;
1099             }
1100         });
1101
1102         actorContext.setCommitIndex(commitIndex);
1103
1104         leader = new Leader(actorContext);
1105
1106         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1107         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1108
1109         Map<String, String> leadersSnapshot = new HashMap<>();
1110         leadersSnapshot.put("1", "A");
1111         leadersSnapshot.put("2", "B");
1112         leadersSnapshot.put("3", "C");
1113
1114         // set the snapshot variables in replicatedlog
1115         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1116         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1117         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1118
1119         ByteString bs = toByteString(leadersSnapshot);
1120         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1121                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1122         leader.setSnapshot(snapshot);
1123
1124         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1125
1126         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1127
1128         assertEquals(1, installSnapshot.getChunkIndex());
1129         assertEquals(3, installSnapshot.getTotalChunks());
1130         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1131
1132         int hashCode = Arrays.hashCode(installSnapshot.getData());
1133
1134         followerActor.underlyingActor().clear();
1135
1136         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1137                 FOLLOWER_ID, 1, true));
1138
1139         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1140
1141         assertEquals(2, installSnapshot.getChunkIndex());
1142         assertEquals(3, installSnapshot.getTotalChunks());
1143         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1144     }
1145
1146     @Test
1147     public void testFollowerToSnapshotLogic() {
1148         logStart("testFollowerToSnapshotLogic");
1149
1150         MockRaftActorContext actorContext = createActorContext();
1151
1152         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1153             @Override
1154             public int getSnapshotChunkSize() {
1155                 return 50;
1156             }
1157         });
1158
1159         leader = new Leader(actorContext);
1160
1161         Map<String, String> leadersSnapshot = new HashMap<>();
1162         leadersSnapshot.put("1", "A");
1163         leadersSnapshot.put("2", "B");
1164         leadersSnapshot.put("3", "C");
1165
1166         ByteString bs = toByteString(leadersSnapshot);
1167         byte[] barray = bs.toByteArray();
1168
1169         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1170         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1171
1172         assertEquals(bs.size(), barray.length);
1173
1174         int chunkIndex=0;
1175         for (int i=0; i < barray.length; i = i + 50) {
1176             int j = i + 50;
1177             chunkIndex++;
1178
1179             if (i + 50 > barray.length) {
1180                 j = barray.length;
1181             }
1182
1183             byte[] chunk = fts.getNextChunk();
1184             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
1185             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1186
1187             fts.markSendStatus(true);
1188             if (!fts.isLastChunk(chunkIndex)) {
1189                 fts.incrementChunkIndex();
1190             }
1191         }
1192
1193         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1194     }
1195
1196     @Override
1197     protected Leader createBehavior(final RaftActorContext actorContext) {
1198         return new Leader(actorContext);
1199     }
1200
1201     @Override
1202     protected MockRaftActorContext createActorContext() {
1203         return createActorContext(leaderActor);
1204     }
1205
1206     @Override
1207     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1208         return createActorContext(LEADER_ID, actorRef);
1209     }
1210
1211     private MockRaftActorContext createActorContextWithFollower() {
1212         MockRaftActorContext actorContext = createActorContext();
1213         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1214                 followerActor.path().toString()).build());
1215         return actorContext;
1216     }
1217
1218     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1219         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1220         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1221         configParams.setElectionTimeoutFactor(100000);
1222         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1223         context.setConfigParams(configParams);
1224         context.setPayloadVersion(payloadVersion);
1225         return context;
1226     }
1227
1228     private MockRaftActorContext createFollowerActorContextWithLeader() {
1229         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1230         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1231         followerConfig.setElectionTimeoutFactor(10000);
1232         followerActorContext.setConfigParams(followerConfig);
1233         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1234         return followerActorContext;
1235     }
1236
1237     @Test
1238     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1239         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1240
1241         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1242
1243         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1244
1245         Follower follower = new Follower(followerActorContext);
1246         followerActor.underlyingActor().setBehavior(follower);
1247         followerActorContext.setCurrentBehavior(follower);
1248
1249         Map<String, String> peerAddresses = new HashMap<>();
1250         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1251
1252         leaderActorContext.setPeerAddresses(peerAddresses);
1253
1254         leaderActorContext.getReplicatedLog().removeFrom(0);
1255
1256         //create 3 entries
1257         leaderActorContext.setReplicatedLog(
1258                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1259
1260         leaderActorContext.setCommitIndex(1);
1261
1262         followerActorContext.getReplicatedLog().removeFrom(0);
1263
1264         // follower too has the exact same log entries and has the same commit index
1265         followerActorContext.setReplicatedLog(
1266                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1267
1268         followerActorContext.setCommitIndex(1);
1269
1270         leader = new Leader(leaderActorContext);
1271         leaderActorContext.setCurrentBehavior(leader);
1272
1273         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1274
1275         assertEquals(1, appendEntries.getLeaderCommit());
1276         assertEquals(0, appendEntries.getEntries().size());
1277         assertEquals(0, appendEntries.getPrevLogIndex());
1278
1279         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1280                 leaderActor, AppendEntriesReply.class);
1281
1282         assertEquals(2, appendEntriesReply.getLogLastIndex());
1283         assertEquals(1, appendEntriesReply.getLogLastTerm());
1284
1285         // follower returns its next index
1286         assertEquals(2, appendEntriesReply.getLogLastIndex());
1287         assertEquals(1, appendEntriesReply.getLogLastTerm());
1288
1289         follower.close();
1290     }
1291
1292     @Test
1293     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1294         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1295
1296         MockRaftActorContext leaderActorContext = createActorContext();
1297
1298         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1299         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1300
1301         Follower follower = new Follower(followerActorContext);
1302         followerActor.underlyingActor().setBehavior(follower);
1303         followerActorContext.setCurrentBehavior(follower);
1304
1305         Map<String, String> leaderPeerAddresses = new HashMap<>();
1306         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1307
1308         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1309
1310         leaderActorContext.getReplicatedLog().removeFrom(0);
1311
1312         leaderActorContext.setReplicatedLog(
1313                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1314
1315         leaderActorContext.setCommitIndex(1);
1316
1317         followerActorContext.getReplicatedLog().removeFrom(0);
1318
1319         followerActorContext.setReplicatedLog(
1320                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1321
1322         // follower has the same log entries but its commit index > leaders commit index
1323         followerActorContext.setCommitIndex(2);
1324
1325         leader = new Leader(leaderActorContext);
1326
1327         // Initial heartbeat
1328         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1329
1330         assertEquals(1, appendEntries.getLeaderCommit());
1331         assertEquals(0, appendEntries.getEntries().size());
1332         assertEquals(0, appendEntries.getPrevLogIndex());
1333
1334         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1335                 leaderActor, AppendEntriesReply.class);
1336
1337         assertEquals(2, appendEntriesReply.getLogLastIndex());
1338         assertEquals(1, appendEntriesReply.getLogLastTerm());
1339
1340         leaderActor.underlyingActor().setBehavior(follower);
1341         leader.handleMessage(followerActor, appendEntriesReply);
1342
1343         leaderActor.underlyingActor().clear();
1344         followerActor.underlyingActor().clear();
1345
1346         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1347                 TimeUnit.MILLISECONDS);
1348
1349         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1350
1351         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1352
1353         assertEquals(2, appendEntries.getLeaderCommit());
1354         assertEquals(0, appendEntries.getEntries().size());
1355         assertEquals(2, appendEntries.getPrevLogIndex());
1356
1357         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1358
1359         assertEquals(2, appendEntriesReply.getLogLastIndex());
1360         assertEquals(1, appendEntriesReply.getLogLastTerm());
1361
1362         assertEquals(2, followerActorContext.getCommitIndex());
1363
1364         follower.close();
1365     }
1366
1367     @Test
1368     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1369         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1370
1371         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1372         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1373                 new FiniteDuration(1000, TimeUnit.SECONDS));
1374
1375         leaderActorContext.setReplicatedLog(
1376                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1377         long leaderCommitIndex = 2;
1378         leaderActorContext.setCommitIndex(leaderCommitIndex);
1379         leaderActorContext.setLastApplied(leaderCommitIndex);
1380
1381         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1382         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1383
1384         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1385
1386         followerActorContext.setReplicatedLog(
1387                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1388         followerActorContext.setCommitIndex(0);
1389         followerActorContext.setLastApplied(0);
1390
1391         Follower follower = new Follower(followerActorContext);
1392         followerActor.underlyingActor().setBehavior(follower);
1393
1394         leader = new Leader(leaderActorContext);
1395
1396         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1397         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1398
1399         MessageCollectorActor.clearMessages(followerActor);
1400         MessageCollectorActor.clearMessages(leaderActor);
1401
1402         // Verify initial AppendEntries sent with the leader's current commit index.
1403         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1404         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1405         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1406
1407         leaderActor.underlyingActor().setBehavior(leader);
1408
1409         leader.handleMessage(followerActor, appendEntriesReply);
1410
1411         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1412         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1413
1414         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1415         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1416         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1417
1418         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1419         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1420                 appendEntries.getEntries().get(0).getData());
1421         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1422         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1423                 appendEntries.getEntries().get(1).getData());
1424
1425         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1426         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1427
1428         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1429
1430         ApplyState applyState = applyStateList.get(0);
1431         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1432         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1433         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1434                 applyState.getReplicatedLogEntry().getData());
1435
1436         applyState = applyStateList.get(1);
1437         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1438         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1439         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1440                 applyState.getReplicatedLogEntry().getData());
1441
1442         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1443         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1444     }
1445
1446     @Test
1447     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1448         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1449
1450         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1451         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1452                 new FiniteDuration(1000, TimeUnit.SECONDS));
1453
1454         leaderActorContext.setReplicatedLog(
1455                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1456         long leaderCommitIndex = 1;
1457         leaderActorContext.setCommitIndex(leaderCommitIndex);
1458         leaderActorContext.setLastApplied(leaderCommitIndex);
1459
1460         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1461         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1462
1463         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1464
1465         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1466         followerActorContext.setCommitIndex(-1);
1467         followerActorContext.setLastApplied(-1);
1468
1469         Follower follower = new Follower(followerActorContext);
1470         followerActor.underlyingActor().setBehavior(follower);
1471         followerActorContext.setCurrentBehavior(follower);
1472
1473         leader = new Leader(leaderActorContext);
1474
1475         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1476         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1477
1478         MessageCollectorActor.clearMessages(followerActor);
1479         MessageCollectorActor.clearMessages(leaderActor);
1480
1481         // Verify initial AppendEntries sent with the leader's current commit index.
1482         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1483         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1484         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1485
1486         leaderActor.underlyingActor().setBehavior(leader);
1487         leaderActorContext.setCurrentBehavior(leader);
1488
1489         leader.handleMessage(followerActor, appendEntriesReply);
1490
1491         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1492         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1493
1494         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1495         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1496         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1497
1498         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1499         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1500                 appendEntries.getEntries().get(0).getData());
1501         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1502         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1503                 appendEntries.getEntries().get(1).getData());
1504
1505         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1506         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1507
1508         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1509
1510         ApplyState applyState = applyStateList.get(0);
1511         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1512         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1513         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1514                 applyState.getReplicatedLogEntry().getData());
1515
1516         applyState = applyStateList.get(1);
1517         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1518         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1519         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1520                 applyState.getReplicatedLogEntry().getData());
1521
1522         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1523         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1524     }
1525
1526     @Test
1527     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1528         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1529
1530         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1531         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1532                 new FiniteDuration(1000, TimeUnit.SECONDS));
1533
1534         leaderActorContext.setReplicatedLog(
1535                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1536         long leaderCommitIndex = 1;
1537         leaderActorContext.setCommitIndex(leaderCommitIndex);
1538         leaderActorContext.setLastApplied(leaderCommitIndex);
1539
1540         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1541         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1542
1543         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1544
1545         followerActorContext.setReplicatedLog(
1546                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1547         followerActorContext.setCommitIndex(-1);
1548         followerActorContext.setLastApplied(-1);
1549
1550         Follower follower = new Follower(followerActorContext);
1551         followerActor.underlyingActor().setBehavior(follower);
1552         followerActorContext.setCurrentBehavior(follower);
1553
1554         leader = new Leader(leaderActorContext);
1555
1556         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1557         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1558
1559         MessageCollectorActor.clearMessages(followerActor);
1560         MessageCollectorActor.clearMessages(leaderActor);
1561
1562         // Verify initial AppendEntries sent with the leader's current commit index.
1563         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1564         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1565         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1566
1567         leaderActor.underlyingActor().setBehavior(leader);
1568         leaderActorContext.setCurrentBehavior(leader);
1569
1570         leader.handleMessage(followerActor, appendEntriesReply);
1571
1572         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1573         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1574
1575         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1576         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1577         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1578
1579         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1580         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1581         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1582                 appendEntries.getEntries().get(0).getData());
1583         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1584         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1585         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1586                 appendEntries.getEntries().get(1).getData());
1587
1588         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1589         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1590
1591         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1592
1593         ApplyState applyState = applyStateList.get(0);
1594         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1595         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1596         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1597                 applyState.getReplicatedLogEntry().getData());
1598
1599         applyState = applyStateList.get(1);
1600         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1601         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1602         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1603                 applyState.getReplicatedLogEntry().getData());
1604
1605         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1606         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1607         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1608     }
1609
1610     @Test
1611     public void testHandleAppendEntriesReplyWithNewerTerm(){
1612         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1613
1614         MockRaftActorContext leaderActorContext = createActorContext();
1615         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1616                 new FiniteDuration(10000, TimeUnit.SECONDS));
1617
1618         leaderActorContext.setReplicatedLog(
1619                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1620
1621         leader = new Leader(leaderActorContext);
1622         leaderActor.underlyingActor().setBehavior(leader);
1623         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1624
1625         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1626
1627         assertEquals(false, appendEntriesReply.isSuccess());
1628         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1629
1630         MessageCollectorActor.clearMessages(leaderActor);
1631     }
1632
1633     @Test
1634     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1635         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1636
1637         MockRaftActorContext leaderActorContext = createActorContext();
1638         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1639                 new FiniteDuration(10000, TimeUnit.SECONDS));
1640
1641         leaderActorContext.setReplicatedLog(
1642                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1643         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1644
1645         leader = new Leader(leaderActorContext);
1646         leaderActor.underlyingActor().setBehavior(leader);
1647         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1648
1649         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1650
1651         assertEquals(false, appendEntriesReply.isSuccess());
1652         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1653
1654         MessageCollectorActor.clearMessages(leaderActor);
1655     }
1656
1657     @Test
1658     public void testHandleAppendEntriesReplySuccess() throws Exception {
1659         logStart("testHandleAppendEntriesReplySuccess");
1660
1661         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1662
1663         leaderActorContext.setReplicatedLog(
1664                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1665
1666         leaderActorContext.setCommitIndex(1);
1667         leaderActorContext.setLastApplied(1);
1668         leaderActorContext.getTermInformation().update(1, "leader");
1669
1670         leader = new Leader(leaderActorContext);
1671
1672         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1673
1674         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1675         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1676
1677         short payloadVersion = 5;
1678         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1679
1680         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1681
1682         assertEquals(RaftState.Leader, raftActorBehavior.state());
1683
1684         assertEquals(2, leaderActorContext.getCommitIndex());
1685
1686         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1687                 leaderActor, ApplyJournalEntries.class);
1688
1689         assertEquals(2, leaderActorContext.getLastApplied());
1690
1691         assertEquals(2, applyJournalEntries.getToIndex());
1692
1693         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1694                 ApplyState.class);
1695
1696         assertEquals(1,applyStateList.size());
1697
1698         ApplyState applyState = applyStateList.get(0);
1699
1700         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1701
1702         assertEquals(2, followerInfo.getMatchIndex());
1703         assertEquals(3, followerInfo.getNextIndex());
1704         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1705         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1706     }
1707
1708     @Test
1709     public void testHandleAppendEntriesReplyUnknownFollower(){
1710         logStart("testHandleAppendEntriesReplyUnknownFollower");
1711
1712         MockRaftActorContext leaderActorContext = createActorContext();
1713
1714         leader = new Leader(leaderActorContext);
1715
1716         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1717
1718         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1719
1720         assertEquals(RaftState.Leader, raftActorBehavior.state());
1721     }
1722
1723     @Test
1724     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1725         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1726
1727         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1728         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1729                 new FiniteDuration(1000, TimeUnit.SECONDS));
1730         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1731
1732         leaderActorContext.setReplicatedLog(
1733                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1734         long leaderCommitIndex = 3;
1735         leaderActorContext.setCommitIndex(leaderCommitIndex);
1736         leaderActorContext.setLastApplied(leaderCommitIndex);
1737
1738         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1739         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1740         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1741         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1742
1743         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1744
1745         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1746         followerActorContext.setCommitIndex(-1);
1747         followerActorContext.setLastApplied(-1);
1748
1749         Follower follower = new Follower(followerActorContext);
1750         followerActor.underlyingActor().setBehavior(follower);
1751         followerActorContext.setCurrentBehavior(follower);
1752
1753         leader = new Leader(leaderActorContext);
1754
1755         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1756         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1757
1758         MessageCollectorActor.clearMessages(followerActor);
1759         MessageCollectorActor.clearMessages(leaderActor);
1760
1761         // Verify initial AppendEntries sent with the leader's current commit index.
1762         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1763         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1764         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1765
1766         leaderActor.underlyingActor().setBehavior(leader);
1767         leaderActorContext.setCurrentBehavior(leader);
1768
1769         leader.handleMessage(followerActor, appendEntriesReply);
1770
1771         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1772         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1773
1774         appendEntries = appendEntriesList.get(0);
1775         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1776         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1777         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1778
1779         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1780         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1781                 appendEntries.getEntries().get(0).getData());
1782         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1783         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1784                 appendEntries.getEntries().get(1).getData());
1785
1786         appendEntries = appendEntriesList.get(1);
1787         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1788         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1789         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1790
1791         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1792         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1793                 appendEntries.getEntries().get(0).getData());
1794         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1795         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1796                 appendEntries.getEntries().get(1).getData());
1797
1798         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1799         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1800
1801         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1802
1803         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1804         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1805     }
1806
1807     @Test
1808     public void testHandleRequestVoteReply(){
1809         logStart("testHandleRequestVoteReply");
1810
1811         MockRaftActorContext leaderActorContext = createActorContext();
1812
1813         leader = new Leader(leaderActorContext);
1814
1815         // Should be a no-op.
1816         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1817                 new RequestVoteReply(1, true));
1818
1819         assertEquals(RaftState.Leader, raftActorBehavior.state());
1820
1821         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1822
1823         assertEquals(RaftState.Leader, raftActorBehavior.state());
1824     }
1825
1826     @Test
1827     public void testIsolatedLeaderCheckNoFollowers() {
1828         logStart("testIsolatedLeaderCheckNoFollowers");
1829
1830         MockRaftActorContext leaderActorContext = createActorContext();
1831
1832         leader = new Leader(leaderActorContext);
1833         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1834         assertTrue(behavior instanceof Leader);
1835     }
1836
1837     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1838         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1839         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1840
1841         MockRaftActorContext leaderActorContext = createActorContext();
1842
1843         Map<String, String> peerAddresses = new HashMap<>();
1844         peerAddresses.put("follower-1", followerActor1.path().toString());
1845         peerAddresses.put("follower-2", followerActor2.path().toString());
1846
1847         leaderActorContext.setPeerAddresses(peerAddresses);
1848         leaderActorContext.setRaftPolicy(raftPolicy);
1849
1850         leader = new Leader(leaderActorContext);
1851
1852         leader.markFollowerActive("follower-1");
1853         leader.markFollowerActive("follower-2");
1854         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1855         assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
1856
1857         // kill 1 follower and verify if that got killed
1858         final JavaTestKit probe = new JavaTestKit(getSystem());
1859         probe.watch(followerActor1);
1860         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1861         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1862         assertEquals(termMsg1.getActor(), followerActor1);
1863
1864         leader.markFollowerInActive("follower-1");
1865         leader.markFollowerActive("follower-2");
1866         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1867         assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
1868
1869         // kill 2nd follower and leader should change to Isolated leader
1870         followerActor2.tell(PoisonPill.getInstance(), null);
1871         probe.watch(followerActor2);
1872         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1873         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1874         assertEquals(termMsg2.getActor(), followerActor2);
1875
1876         leader.markFollowerInActive("follower-2");
1877         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1878     }
1879
1880     @Test
1881     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1882         logStart("testIsolatedLeaderCheckTwoFollowers");
1883
1884         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1885
1886         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1887             behavior instanceof IsolatedLeader);
1888     }
1889
1890     @Test
1891     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1892         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1893
1894         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1895
1896         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1897                 behavior instanceof Leader);
1898     }
1899
1900     @Test
1901     public void testLaggingFollowerStarvation() throws Exception {
1902         logStart("testLaggingFollowerStarvation");
1903         new JavaTestKit(getSystem()) {{
1904             String leaderActorId = actorFactory.generateActorId("leader");
1905             String follower1ActorId = actorFactory.generateActorId("follower");
1906             String follower2ActorId = actorFactory.generateActorId("follower");
1907
1908             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1909                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1910             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1911             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1912
1913             MockRaftActorContext leaderActorContext =
1914                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1915
1916             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1917             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1918             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1919
1920             leaderActorContext.setConfigParams(configParams);
1921
1922             leaderActorContext.setReplicatedLog(
1923                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1924
1925             Map<String, String> peerAddresses = new HashMap<>();
1926             peerAddresses.put(follower1ActorId,
1927                     follower1Actor.path().toString());
1928             peerAddresses.put(follower2ActorId,
1929                     follower2Actor.path().toString());
1930
1931             leaderActorContext.setPeerAddresses(peerAddresses);
1932             leaderActorContext.getTermInformation().update(1, leaderActorId);
1933
1934             RaftActorBehavior leader = createBehavior(leaderActorContext);
1935
1936             leaderActor.underlyingActor().setBehavior(leader);
1937
1938             for(int i=1;i<6;i++) {
1939                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1940                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1941                 assertTrue(newBehavior == leader);
1942                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1943             }
1944
1945             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1946             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1947
1948             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1949                     heartbeats.size() > 1);
1950
1951             // Check if follower-2 got AppendEntries during this time and was not starved
1952             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1953
1954             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1955                     appendEntries.size() > 1);
1956
1957         }};
1958     }
1959
1960     @Test
1961     public void testReplicationConsensusWithNonVotingFollower() {
1962         logStart("testReplicationConsensusWithNonVotingFollower");
1963
1964         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1965         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1966                 new FiniteDuration(1000, TimeUnit.SECONDS));
1967
1968         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1969
1970         String nonVotingFollowerId = "nonvoting-follower";
1971         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1972                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1973
1974         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1975
1976         leader = new Leader(leaderActorContext);
1977         leaderActorContext.setCurrentBehavior(leader);
1978
1979         // Ignore initial heartbeats
1980         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1981         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1982
1983         MessageCollectorActor.clearMessages(followerActor);
1984         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1985         MessageCollectorActor.clearMessages(leaderActor);
1986
1987         // Send a Replicate message and wait for AppendEntries.
1988         sendReplicate(leaderActorContext, 0);
1989
1990         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1991         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1992
1993         // Send reply only from the voting follower and verify consensus via ApplyState.
1994         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1995
1996         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1997
1998         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1999
2000         MessageCollectorActor.clearMessages(followerActor);
2001         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2002         MessageCollectorActor.clearMessages(leaderActor);
2003
2004         // Send another Replicate message
2005         sendReplicate(leaderActorContext, 1);
2006
2007         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2008         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2009                 AppendEntries.class);
2010         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2011         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2012
2013         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2014         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2015
2016         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2017
2018         // Send reply from the voting follower and verify consensus.
2019         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2020
2021         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2022     }
2023
2024     @Test
2025     public void testTransferLeadershipWithFollowerInSync() {
2026         logStart("testTransferLeadershipWithFollowerInSync");
2027
2028         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2029         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2030                 new FiniteDuration(1000, TimeUnit.SECONDS));
2031         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2032
2033         leader = new Leader(leaderActorContext);
2034         leaderActorContext.setCurrentBehavior(leader);
2035
2036         // Initial heartbeat
2037         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2038         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2039         MessageCollectorActor.clearMessages(followerActor);
2040
2041         sendReplicate(leaderActorContext, 0);
2042         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2043
2044         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2045         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2046         MessageCollectorActor.clearMessages(followerActor);
2047
2048         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2049         leader.transferLeadership(mockTransferCohort);
2050
2051         verify(mockTransferCohort, never()).transferComplete();
2052         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2053         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2054
2055         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2056         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2057
2058         // Leader should force an election timeout
2059         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2060
2061         verify(mockTransferCohort).transferComplete();
2062     }
2063
2064     @Test
2065     public void testTransferLeadershipWithEmptyLog() {
2066         logStart("testTransferLeadershipWithEmptyLog");
2067
2068         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2069         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2070                 new FiniteDuration(1000, TimeUnit.SECONDS));
2071         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2072
2073         leader = new Leader(leaderActorContext);
2074         leaderActorContext.setCurrentBehavior(leader);
2075
2076         // Initial heartbeat
2077         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2078         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2079         MessageCollectorActor.clearMessages(followerActor);
2080
2081         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2082         leader.transferLeadership(mockTransferCohort);
2083
2084         verify(mockTransferCohort, never()).transferComplete();
2085         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2086         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2087
2088         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2089         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2090
2091         // Leader should force an election timeout
2092         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2093
2094         verify(mockTransferCohort).transferComplete();
2095     }
2096
2097     @Test
2098     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2099         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2100
2101         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2102         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2103                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2104
2105         leader = new Leader(leaderActorContext);
2106         leaderActorContext.setCurrentBehavior(leader);
2107
2108         // Initial heartbeat
2109         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2110         MessageCollectorActor.clearMessages(followerActor);
2111
2112         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2113         leader.transferLeadership(mockTransferCohort);
2114
2115         verify(mockTransferCohort, never()).transferComplete();
2116
2117         // Sync up the follower.
2118         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2119         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2120         MessageCollectorActor.clearMessages(followerActor);
2121
2122         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2123                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2124         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2125         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2126         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2127
2128         // Leader should force an election timeout
2129         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2130
2131         verify(mockTransferCohort).transferComplete();
2132     }
2133
2134     @Test
2135     public void testTransferLeadershipWithFollowerSyncTimeout() {
2136         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2137
2138         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2139         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2140                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2141         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2142         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2143
2144         leader = new Leader(leaderActorContext);
2145         leaderActorContext.setCurrentBehavior(leader);
2146
2147         // Initial heartbeat
2148         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2150         MessageCollectorActor.clearMessages(followerActor);
2151
2152         sendReplicate(leaderActorContext, 0);
2153         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154
2155         MessageCollectorActor.clearMessages(followerActor);
2156
2157         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2158         leader.transferLeadership(mockTransferCohort);
2159
2160         verify(mockTransferCohort, never()).transferComplete();
2161
2162         // Send heartbeats to time out the transfer.
2163         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2164             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2165                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2166             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2167         }
2168
2169         verify(mockTransferCohort).abortTransfer();
2170         verify(mockTransferCohort, never()).transferComplete();
2171         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2172     }
2173
2174     @Override
2175     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2176             ActorRef actorRef, RaftRPC rpc) throws Exception {
2177         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2178         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2179     }
2180
2181     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2182
2183         private final long electionTimeOutIntervalMillis;
2184         private final int snapshotChunkSize;
2185
2186         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2187             super();
2188             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2189             this.snapshotChunkSize = snapshotChunkSize;
2190         }
2191
2192         @Override
2193         public FiniteDuration getElectionTimeOutInterval() {
2194             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2195         }
2196
2197         @Override
2198         public int getSnapshotChunkSize() {
2199             return snapshotChunkSize;
2200         }
2201     }
2202 }