Add leader unit test for non-voting consensus
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.VotingState;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
43 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
45 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
47 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
48 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
50 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
52 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
53 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
54 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
55 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
56 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
57 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import scala.concurrent.duration.FiniteDuration;
60
61 public class LeaderTest extends AbstractLeaderTest {
62
63     static final String FOLLOWER_ID = "follower";
64     public static final String LEADER_ID = "leader";
65
66     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
67             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
68
69     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
70             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
71
72     private Leader leader;
73     private final short payloadVersion = 5;
74
75     @Override
76     @After
77     public void tearDown() throws Exception {
78         if(leader != null) {
79             leader.close();
80         }
81
82         super.tearDown();
83     }
84
85     @Test
86     public void testHandleMessageForUnknownMessage() throws Exception {
87         logStart("testHandleMessageForUnknownMessage");
88
89         leader = new Leader(createActorContext());
90
91         // handle message should return the Leader state when it receives an
92         // unknown message
93         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
94         Assert.assertTrue(behavior instanceof Leader);
95     }
96
97     @Test
98     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
99         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
100
101         MockRaftActorContext actorContext = createActorContextWithFollower();
102         short payloadVersion = (short)5;
103         actorContext.setPayloadVersion(payloadVersion);
104
105         long term = 1;
106         actorContext.getTermInformation().update(term, "");
107
108         leader = new Leader(actorContext);
109
110         // Leader should send an immediate heartbeat with no entries as follower is inactive.
111         long lastIndex = actorContext.getReplicatedLog().lastIndex();
112         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
113         assertEquals("getTerm", term, appendEntries.getTerm());
114         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
115         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
116         assertEquals("Entries size", 0, appendEntries.getEntries().size());
117         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
118
119         // The follower would normally reply - simulate that explicitly here.
120         leader.handleMessage(followerActor, new AppendEntriesReply(
121                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
122         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
123
124         followerActor.underlyingActor().clear();
125
126         // Sleep for the heartbeat interval so AppendEntries is sent.
127         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
128                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
129
130         leader.handleMessage(leaderActor, new SendHeartBeat());
131
132         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
134         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
135         assertEquals("Entries size", 1, appendEntries.getEntries().size());
136         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
137         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
138         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139     }
140
141
142     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
143         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
144         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
145                 1, index, payload);
146         actorContext.getReplicatedLog().append(newEntry);
147         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
148     }
149
150     @Test
151     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
152         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
153
154         MockRaftActorContext actorContext = createActorContextWithFollower();
155
156         long term = 1;
157         actorContext.getTermInformation().update(term, "");
158
159         leader = new Leader(actorContext);
160
161         // Leader will send an immediate heartbeat - ignore it.
162         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
163
164         // The follower would normally reply - simulate that explicitly here.
165         long lastIndex = actorContext.getReplicatedLog().lastIndex();
166         leader.handleMessage(followerActor, new AppendEntriesReply(
167                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
168         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
169
170         followerActor.underlyingActor().clear();
171
172         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
173
174         // State should not change
175         assertTrue(raftBehavior instanceof Leader);
176
177         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
178         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
179         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
180         assertEquals("Entries size", 1, appendEntries.getEntries().size());
181         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
182         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
183         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
184         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
185     }
186
187     @Test
188     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
189         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
190
191         MockRaftActorContext actorContext = createActorContextWithFollower();
192         actorContext.setRaftPolicy(createRaftPolicy(true, true));
193
194         long term = 1;
195         actorContext.getTermInformation().update(term, "");
196
197         leader = new Leader(actorContext);
198
199         // Leader will send an immediate heartbeat - ignore it.
200         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
201
202         // The follower would normally reply - simulate that explicitly here.
203         long lastIndex = actorContext.getReplicatedLog().lastIndex();
204         leader.handleMessage(followerActor, new AppendEntriesReply(
205                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
206         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
207
208         followerActor.underlyingActor().clear();
209
210         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
211
212         // State should not change
213         assertTrue(raftBehavior instanceof Leader);
214
215         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
216         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
217         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
218         assertEquals("Entries size", 1, appendEntries.getEntries().size());
219         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
220         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
221         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
222         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
223     }
224
225     @Test
226     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
227         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
228
229         MockRaftActorContext actorContext = createActorContextWithFollower();
230         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
231             @Override
232             public FiniteDuration getHeartBeatInterval() {
233                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
234             }
235         });
236
237         long term = 1;
238         actorContext.getTermInformation().update(term, "");
239
240         leader = new Leader(actorContext);
241
242         // Leader will send an immediate heartbeat - ignore it.
243         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
244
245         // The follower would normally reply - simulate that explicitly here.
246         long lastIndex = actorContext.getReplicatedLog().lastIndex();
247         leader.handleMessage(followerActor, new AppendEntriesReply(
248                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
249         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
250
251         followerActor.underlyingActor().clear();
252
253         for(int i=0;i<5;i++) {
254             sendReplicate(actorContext, lastIndex+i+1);
255         }
256
257         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
258         // We expect only 1 message to be sent because of two reasons,
259         // - an append entries reply was not received
260         // - the heartbeat interval has not expired
261         // In this scenario if multiple messages are sent they would likely be duplicates
262         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
263     }
264
265     @Test
266     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
267         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
268
269         MockRaftActorContext actorContext = createActorContextWithFollower();
270         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
271             @Override
272             public FiniteDuration getHeartBeatInterval() {
273                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
274             }
275         });
276
277         long term = 1;
278         actorContext.getTermInformation().update(term, "");
279
280         leader = new Leader(actorContext);
281
282         // Leader will send an immediate heartbeat - ignore it.
283         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284
285         // The follower would normally reply - simulate that explicitly here.
286         long lastIndex = actorContext.getReplicatedLog().lastIndex();
287         leader.handleMessage(followerActor, new AppendEntriesReply(
288                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
289         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290
291         followerActor.underlyingActor().clear();
292
293         for(int i=0;i<3;i++) {
294             sendReplicate(actorContext, lastIndex+i+1);
295             leader.handleMessage(followerActor, new AppendEntriesReply(
296                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
297
298         }
299
300         for(int i=3;i<5;i++) {
301             sendReplicate(actorContext, lastIndex + i + 1);
302         }
303
304         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
305         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
306         // get sent to the follower - but not the 5th
307         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
308
309         for(int i=0;i<4;i++) {
310             long expected = allMessages.get(i).getEntries().get(0).getIndex();
311             assertEquals(expected, i+2);
312         }
313     }
314
315     @Test
316     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
317         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
318
319         MockRaftActorContext actorContext = createActorContextWithFollower();
320         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
321             @Override
322             public FiniteDuration getHeartBeatInterval() {
323                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
324             }
325         });
326
327         long term = 1;
328         actorContext.getTermInformation().update(term, "");
329
330         leader = new Leader(actorContext);
331
332         // Leader will send an immediate heartbeat - ignore it.
333         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
334
335         // The follower would normally reply - simulate that explicitly here.
336         long lastIndex = actorContext.getReplicatedLog().lastIndex();
337         leader.handleMessage(followerActor, new AppendEntriesReply(
338                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
339         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
340
341         followerActor.underlyingActor().clear();
342
343         sendReplicate(actorContext, lastIndex+1);
344
345         // Wait slightly longer than heartbeat duration
346         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
347
348         leader.handleMessage(leaderActor, new SendHeartBeat());
349
350         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
351         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
352
353         assertEquals(1, allMessages.get(0).getEntries().size());
354         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
355         assertEquals(1, allMessages.get(1).getEntries().size());
356         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
357
358     }
359
360     @Test
361     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
362         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
363
364         MockRaftActorContext actorContext = createActorContextWithFollower();
365         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
366             @Override
367             public FiniteDuration getHeartBeatInterval() {
368                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
369             }
370         });
371
372         long term = 1;
373         actorContext.getTermInformation().update(term, "");
374
375         leader = new Leader(actorContext);
376
377         // Leader will send an immediate heartbeat - ignore it.
378         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
379
380         // The follower would normally reply - simulate that explicitly here.
381         long lastIndex = actorContext.getReplicatedLog().lastIndex();
382         leader.handleMessage(followerActor, new AppendEntriesReply(
383                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
384         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
385
386         followerActor.underlyingActor().clear();
387
388         for(int i=0;i<3;i++) {
389             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
390             leader.handleMessage(leaderActor, new SendHeartBeat());
391         }
392
393         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
394         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
395     }
396
397     @Test
398     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
399         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
400
401         MockRaftActorContext actorContext = createActorContextWithFollower();
402         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
403             @Override
404             public FiniteDuration getHeartBeatInterval() {
405                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
406             }
407         });
408
409         long term = 1;
410         actorContext.getTermInformation().update(term, "");
411
412         leader = new Leader(actorContext);
413
414         // Leader will send an immediate heartbeat - ignore it.
415         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
416
417         // The follower would normally reply - simulate that explicitly here.
418         long lastIndex = actorContext.getReplicatedLog().lastIndex();
419         leader.handleMessage(followerActor, new AppendEntriesReply(
420                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
421         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
422
423         followerActor.underlyingActor().clear();
424
425         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
426         leader.handleMessage(leaderActor, new SendHeartBeat());
427         sendReplicate(actorContext, lastIndex+1);
428
429         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
430         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
431
432         assertEquals(0, allMessages.get(0).getEntries().size());
433         assertEquals(1, allMessages.get(1).getEntries().size());
434     }
435
436
437     @Test
438     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
439         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
440
441         MockRaftActorContext actorContext = createActorContext();
442
443         leader = new Leader(actorContext);
444
445         actorContext.setLastApplied(0);
446
447         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
448         long term = actorContext.getTermInformation().getCurrentTerm();
449         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
450                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
451
452         actorContext.getReplicatedLog().append(newEntry);
453
454         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
455                 new Replicate(leaderActor, "state-id", newEntry));
456
457         // State should not change
458         assertTrue(raftBehavior instanceof Leader);
459
460         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
461
462         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
463         // one since lastApplied state is 0.
464         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
465                 leaderActor, ApplyState.class);
466         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
467
468         for(int i = 0; i <= newLogIndex - 1; i++ ) {
469             ApplyState applyState = applyStateList.get(i);
470             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
471             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
472         }
473
474         ApplyState last = applyStateList.get((int) newLogIndex - 1);
475         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
476         assertEquals("getIdentifier", "state-id", last.getIdentifier());
477     }
478
479     @Test
480     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
481         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
482
483         MockRaftActorContext actorContext = createActorContextWithFollower();
484
485         Map<String, String> leadersSnapshot = new HashMap<>();
486         leadersSnapshot.put("1", "A");
487         leadersSnapshot.put("2", "B");
488         leadersSnapshot.put("3", "C");
489
490         //clears leaders log
491         actorContext.getReplicatedLog().removeFrom(0);
492
493         final int commitIndex = 3;
494         final int snapshotIndex = 2;
495         final int newEntryIndex = 4;
496         final int snapshotTerm = 1;
497         final int currentTerm = 2;
498
499         // set the snapshot variables in replicatedlog
500         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
501         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
502         actorContext.setCommitIndex(commitIndex);
503         //set follower timeout to 2 mins, helps during debugging
504         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
505
506         leader = new Leader(actorContext);
507
508         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
509         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
510
511         // new entry
512         ReplicatedLogImplEntry entry =
513                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
514                         new MockRaftActorContext.MockPayload("D"));
515
516         //update follower timestamp
517         leader.markFollowerActive(FOLLOWER_ID);
518
519         ByteString bs = toByteString(leadersSnapshot);
520         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
521                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
522         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
523         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
524
525         //send first chunk and no InstallSnapshotReply received yet
526         fts.getNextChunk();
527         fts.incrementChunkIndex();
528
529         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
530                 TimeUnit.MILLISECONDS);
531
532         leader.handleMessage(leaderActor, new SendHeartBeat());
533
534         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
535
536         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
537
538         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
539
540         //InstallSnapshotReply received
541         fts.markSendStatus(true);
542
543         leader.handleMessage(leaderActor, new SendHeartBeat());
544
545         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
546
547         assertEquals(commitIndex, is.getLastIncludedIndex());
548     }
549
550     @Test
551     public void testSendAppendEntriesSnapshotScenario() throws Exception {
552         logStart("testSendAppendEntriesSnapshotScenario");
553
554         MockRaftActorContext actorContext = createActorContextWithFollower();
555
556         Map<String, String> leadersSnapshot = new HashMap<>();
557         leadersSnapshot.put("1", "A");
558         leadersSnapshot.put("2", "B");
559         leadersSnapshot.put("3", "C");
560
561         //clears leaders log
562         actorContext.getReplicatedLog().removeFrom(0);
563
564         final int followersLastIndex = 2;
565         final int snapshotIndex = 3;
566         final int newEntryIndex = 4;
567         final int snapshotTerm = 1;
568         final int currentTerm = 2;
569
570         // set the snapshot variables in replicatedlog
571         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
572         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
573         actorContext.setCommitIndex(followersLastIndex);
574
575         leader = new Leader(actorContext);
576
577         // Leader will send an immediate heartbeat - ignore it.
578         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
579
580         // new entry
581         ReplicatedLogImplEntry entry =
582                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
583                         new MockRaftActorContext.MockPayload("D"));
584
585         actorContext.getReplicatedLog().append(entry);
586
587         //update follower timestamp
588         leader.markFollowerActive(FOLLOWER_ID);
589
590         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
591         RaftActorBehavior raftBehavior = leader.handleMessage(
592                 leaderActor, new Replicate(null, "state-id", entry));
593
594         assertTrue(raftBehavior instanceof Leader);
595
596         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
597     }
598
599     @Test
600     public void testInitiateInstallSnapshot() throws Exception {
601         logStart("testInitiateInstallSnapshot");
602
603         MockRaftActorContext actorContext = createActorContextWithFollower();
604
605         //clears leaders log
606         actorContext.getReplicatedLog().removeFrom(0);
607
608         final int followersLastIndex = 2;
609         final int snapshotIndex = 3;
610         final int newEntryIndex = 4;
611         final int snapshotTerm = 1;
612         final int currentTerm = 2;
613
614         // set the snapshot variables in replicatedlog
615         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
616         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
617         actorContext.setLastApplied(3);
618         actorContext.setCommitIndex(followersLastIndex);
619
620         leader = new Leader(actorContext);
621
622         // Leader will send an immediate heartbeat - ignore it.
623         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
624
625         // set the snapshot as absent and check if capture-snapshot is invoked.
626         leader.setSnapshot(null);
627
628         // new entry
629         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
630                 new MockRaftActorContext.MockPayload("D"));
631
632         actorContext.getReplicatedLog().append(entry);
633
634         //update follower timestamp
635         leader.markFollowerActive(FOLLOWER_ID);
636
637         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
638
639         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
640
641         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
642
643         assertTrue(cs.isInstallSnapshotInitiated());
644         assertEquals(3, cs.getLastAppliedIndex());
645         assertEquals(1, cs.getLastAppliedTerm());
646         assertEquals(4, cs.getLastIndex());
647         assertEquals(2, cs.getLastTerm());
648
649         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
650         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
651
652         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
653     }
654
655     @Test
656     public void testInitiateForceInstallSnapshot() throws Exception {
657         logStart("testInitiateForceInstallSnapshot");
658
659         MockRaftActorContext actorContext = createActorContextWithFollower();
660
661         final int followersLastIndex = 2;
662         final int snapshotIndex = -1;
663         final int newEntryIndex = 4;
664         final int snapshotTerm = -1;
665         final int currentTerm = 2;
666
667         // set the snapshot variables in replicatedlog
668         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
669         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
670         actorContext.setLastApplied(3);
671         actorContext.setCommitIndex(followersLastIndex);
672
673         actorContext.getReplicatedLog().removeFrom(0);
674
675         leader = new Leader(actorContext);
676
677         // Leader will send an immediate heartbeat - ignore it.
678         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
679
680         // set the snapshot as absent and check if capture-snapshot is invoked.
681         leader.setSnapshot(null);
682
683         for(int i=0;i<4;i++) {
684             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
685                     new MockRaftActorContext.MockPayload("X" + i)));
686         }
687
688         // new entry
689         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
690                 new MockRaftActorContext.MockPayload("D"));
691
692         actorContext.getReplicatedLog().append(entry);
693
694         //update follower timestamp
695         leader.markFollowerActive(FOLLOWER_ID);
696
697         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
698         // installed with a SendInstallSnapshot
699         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
700
701         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
702
703         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
704
705         assertTrue(cs.isInstallSnapshotInitiated());
706         assertEquals(3, cs.getLastAppliedIndex());
707         assertEquals(1, cs.getLastAppliedTerm());
708         assertEquals(4, cs.getLastIndex());
709         assertEquals(2, cs.getLastTerm());
710
711         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
712         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
713
714         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
715     }
716
717
718     @Test
719     public void testInstallSnapshot() throws Exception {
720         logStart("testInstallSnapshot");
721
722         MockRaftActorContext actorContext = createActorContextWithFollower();
723
724         Map<String, String> leadersSnapshot = new HashMap<>();
725         leadersSnapshot.put("1", "A");
726         leadersSnapshot.put("2", "B");
727         leadersSnapshot.put("3", "C");
728
729         //clears leaders log
730         actorContext.getReplicatedLog().removeFrom(0);
731
732         final int lastAppliedIndex = 3;
733         final int snapshotIndex = 2;
734         final int snapshotTerm = 1;
735         final int currentTerm = 2;
736
737         // set the snapshot variables in replicatedlog
738         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
739         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
740         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
741         actorContext.setCommitIndex(lastAppliedIndex);
742         actorContext.setLastApplied(lastAppliedIndex);
743
744         leader = new Leader(actorContext);
745
746         // Initial heartbeat.
747         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
748
749         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
750         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
751
752         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
753                 Collections.<ReplicatedLogEntry>emptyList(),
754                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
755
756         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
757
758         assertTrue(raftBehavior instanceof Leader);
759
760         // check if installsnapshot gets called with the correct values.
761
762         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
763
764         assertNotNull(installSnapshot.getData());
765         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
766         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
767
768         assertEquals(currentTerm, installSnapshot.getTerm());
769     }
770
771     @Test
772     public void testForceInstallSnapshot() throws Exception {
773         logStart("testForceInstallSnapshot");
774
775         MockRaftActorContext actorContext = createActorContextWithFollower();
776
777         Map<String, String> leadersSnapshot = new HashMap<>();
778         leadersSnapshot.put("1", "A");
779         leadersSnapshot.put("2", "B");
780         leadersSnapshot.put("3", "C");
781
782         final int lastAppliedIndex = 3;
783         final int snapshotIndex = -1;
784         final int snapshotTerm = -1;
785         final int currentTerm = 2;
786
787         // set the snapshot variables in replicatedlog
788         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
789         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
790         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
791         actorContext.setCommitIndex(lastAppliedIndex);
792         actorContext.setLastApplied(lastAppliedIndex);
793
794         leader = new Leader(actorContext);
795
796         // Initial heartbeat.
797         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
798
799         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
800         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
801
802         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
803                 Collections.<ReplicatedLogEntry>emptyList(),
804                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
805
806         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
807
808         assertTrue(raftBehavior instanceof Leader);
809
810         // check if installsnapshot gets called with the correct values.
811
812         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
813
814         assertNotNull(installSnapshot.getData());
815         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
816         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
817
818         assertEquals(currentTerm, installSnapshot.getTerm());
819     }
820
821     @Test
822     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
823         logStart("testHandleInstallSnapshotReplyLastChunk");
824
825         MockRaftActorContext actorContext = createActorContextWithFollower();
826
827         final int commitIndex = 3;
828         final int snapshotIndex = 2;
829         final int snapshotTerm = 1;
830         final int currentTerm = 2;
831
832         actorContext.setCommitIndex(commitIndex);
833
834         leader = new Leader(actorContext);
835
836         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
837         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
838
839         // Ignore initial heartbeat.
840         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
841
842         Map<String, String> leadersSnapshot = new HashMap<>();
843         leadersSnapshot.put("1", "A");
844         leadersSnapshot.put("2", "B");
845         leadersSnapshot.put("3", "C");
846
847         // set the snapshot variables in replicatedlog
848
849         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
850         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
851         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
852
853         ByteString bs = toByteString(leadersSnapshot);
854         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
855                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
856         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
857         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
858         while(!fts.isLastChunk(fts.getChunkIndex())) {
859             fts.getNextChunk();
860             fts.incrementChunkIndex();
861         }
862
863         //clears leaders log
864         actorContext.getReplicatedLog().removeFrom(0);
865
866         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
867                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
868
869         assertTrue(raftBehavior instanceof Leader);
870
871         assertEquals(0, leader.followerSnapshotSize());
872         assertEquals(1, leader.followerLogSize());
873         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
874         assertNotNull(fli);
875         assertEquals(commitIndex, fli.getMatchIndex());
876         assertEquals(commitIndex + 1, fli.getNextIndex());
877     }
878
879     @Test
880     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
881         logStart("testSendSnapshotfromInstallSnapshotReply");
882
883         MockRaftActorContext actorContext = createActorContextWithFollower();
884
885         final int commitIndex = 3;
886         final int snapshotIndex = 2;
887         final int snapshotTerm = 1;
888         final int currentTerm = 2;
889
890         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
891             @Override
892             public int getSnapshotChunkSize() {
893                 return 50;
894             }
895         };
896         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
897         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
898
899         actorContext.setConfigParams(configParams);
900         actorContext.setCommitIndex(commitIndex);
901
902         leader = new Leader(actorContext);
903
904         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
905         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
906
907         Map<String, String> leadersSnapshot = new HashMap<>();
908         leadersSnapshot.put("1", "A");
909         leadersSnapshot.put("2", "B");
910         leadersSnapshot.put("3", "C");
911
912         // set the snapshot variables in replicatedlog
913         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916
917         ByteString bs = toByteString(leadersSnapshot);
918         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
919                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
920         leader.setSnapshot(snapshot);
921
922         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
923
924         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
925
926         assertEquals(1, installSnapshot.getChunkIndex());
927         assertEquals(3, installSnapshot.getTotalChunks());
928
929         followerActor.underlyingActor().clear();
930         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
931                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
932
933         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
934
935         assertEquals(2, installSnapshot.getChunkIndex());
936         assertEquals(3, installSnapshot.getTotalChunks());
937
938         followerActor.underlyingActor().clear();
939         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
940                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
941
942         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
943
944         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
945         followerActor.underlyingActor().clear();
946         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
947                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
948
949         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
950
951         Assert.assertNull(installSnapshot);
952     }
953
954
955     @Test
956     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
957         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
958
959         MockRaftActorContext actorContext = createActorContextWithFollower();
960
961         final int commitIndex = 3;
962         final int snapshotIndex = 2;
963         final int snapshotTerm = 1;
964         final int currentTerm = 2;
965
966         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
967             @Override
968             public int getSnapshotChunkSize() {
969                 return 50;
970             }
971         });
972
973         actorContext.setCommitIndex(commitIndex);
974
975         leader = new Leader(actorContext);
976
977         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
978         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
979
980         Map<String, String> leadersSnapshot = new HashMap<>();
981         leadersSnapshot.put("1", "A");
982         leadersSnapshot.put("2", "B");
983         leadersSnapshot.put("3", "C");
984
985         // set the snapshot variables in replicatedlog
986         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
987         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
988         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
989
990         ByteString bs = toByteString(leadersSnapshot);
991         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
992                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
993         leader.setSnapshot(snapshot);
994
995         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
996         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
997
998         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
999
1000         assertEquals(1, installSnapshot.getChunkIndex());
1001         assertEquals(3, installSnapshot.getTotalChunks());
1002
1003         followerActor.underlyingActor().clear();
1004
1005         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1006                 FOLLOWER_ID, -1, false));
1007
1008         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1009                 TimeUnit.MILLISECONDS);
1010
1011         leader.handleMessage(leaderActor, new SendHeartBeat());
1012
1013         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1014
1015         assertEquals(1, installSnapshot.getChunkIndex());
1016         assertEquals(3, installSnapshot.getTotalChunks());
1017     }
1018
1019     @Test
1020     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1021         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1022
1023         MockRaftActorContext actorContext = createActorContextWithFollower();
1024
1025         final int commitIndex = 3;
1026         final int snapshotIndex = 2;
1027         final int snapshotTerm = 1;
1028         final int currentTerm = 2;
1029
1030         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1031             @Override
1032             public int getSnapshotChunkSize() {
1033                 return 50;
1034             }
1035         });
1036
1037         actorContext.setCommitIndex(commitIndex);
1038
1039         leader = new Leader(actorContext);
1040
1041         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1042         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1043
1044         Map<String, String> leadersSnapshot = new HashMap<>();
1045         leadersSnapshot.put("1", "A");
1046         leadersSnapshot.put("2", "B");
1047         leadersSnapshot.put("3", "C");
1048
1049         // set the snapshot variables in replicatedlog
1050         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1051         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1052         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1053
1054         ByteString bs = toByteString(leadersSnapshot);
1055         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1056                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1057         leader.setSnapshot(snapshot);
1058
1059         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1060
1061         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1062
1063         assertEquals(1, installSnapshot.getChunkIndex());
1064         assertEquals(3, installSnapshot.getTotalChunks());
1065         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1066
1067         int hashCode = installSnapshot.getData().hashCode();
1068
1069         followerActor.underlyingActor().clear();
1070
1071         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1072                 FOLLOWER_ID, 1, true));
1073
1074         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1075
1076         assertEquals(2, installSnapshot.getChunkIndex());
1077         assertEquals(3, installSnapshot.getTotalChunks());
1078         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1079     }
1080
1081     @Test
1082     public void testFollowerToSnapshotLogic() {
1083         logStart("testFollowerToSnapshotLogic");
1084
1085         MockRaftActorContext actorContext = createActorContext();
1086
1087         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1088             @Override
1089             public int getSnapshotChunkSize() {
1090                 return 50;
1091             }
1092         });
1093
1094         leader = new Leader(actorContext);
1095
1096         Map<String, String> leadersSnapshot = new HashMap<>();
1097         leadersSnapshot.put("1", "A");
1098         leadersSnapshot.put("2", "B");
1099         leadersSnapshot.put("3", "C");
1100
1101         ByteString bs = toByteString(leadersSnapshot);
1102         byte[] barray = bs.toByteArray();
1103
1104         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1105         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1106
1107         assertEquals(bs.size(), barray.length);
1108
1109         int chunkIndex=0;
1110         for (int i=0; i < barray.length; i = i + 50) {
1111             int j = i + 50;
1112             chunkIndex++;
1113
1114             if (i + 50 > barray.length) {
1115                 j = barray.length;
1116             }
1117
1118             ByteString chunk = fts.getNextChunk();
1119             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1120             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1121
1122             fts.markSendStatus(true);
1123             if (!fts.isLastChunk(chunkIndex)) {
1124                 fts.incrementChunkIndex();
1125             }
1126         }
1127
1128         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1129     }
1130
1131     @Override protected RaftActorBehavior createBehavior(
1132         RaftActorContext actorContext) {
1133         return new Leader(actorContext);
1134     }
1135
1136     @Override
1137     protected MockRaftActorContext createActorContext() {
1138         return createActorContext(leaderActor);
1139     }
1140
1141     @Override
1142     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1143         return createActorContext(LEADER_ID, actorRef);
1144     }
1145
1146     private MockRaftActorContext createActorContextWithFollower() {
1147         MockRaftActorContext actorContext = createActorContext();
1148         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1149                 followerActor.path().toString()).build());
1150         return actorContext;
1151     }
1152
1153     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1154         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1155         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1156         configParams.setElectionTimeoutFactor(100000);
1157         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1158         context.setConfigParams(configParams);
1159         context.setPayloadVersion(payloadVersion);
1160         return context;
1161     }
1162
1163     private MockRaftActorContext createFollowerActorContextWithLeader() {
1164         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1165         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1166         followerConfig.setElectionTimeoutFactor(10000);
1167         followerActorContext.setConfigParams(followerConfig);
1168         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1169         return followerActorContext;
1170     }
1171
1172     @Test
1173     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1174         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1175
1176         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1177
1178         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1179
1180         Follower follower = new Follower(followerActorContext);
1181         followerActor.underlyingActor().setBehavior(follower);
1182
1183         Map<String, String> peerAddresses = new HashMap<>();
1184         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1185
1186         leaderActorContext.setPeerAddresses(peerAddresses);
1187
1188         leaderActorContext.getReplicatedLog().removeFrom(0);
1189
1190         //create 3 entries
1191         leaderActorContext.setReplicatedLog(
1192                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1193
1194         leaderActorContext.setCommitIndex(1);
1195
1196         followerActorContext.getReplicatedLog().removeFrom(0);
1197
1198         // follower too has the exact same log entries and has the same commit index
1199         followerActorContext.setReplicatedLog(
1200                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1201
1202         followerActorContext.setCommitIndex(1);
1203
1204         leader = new Leader(leaderActorContext);
1205
1206         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1207
1208         assertEquals(1, appendEntries.getLeaderCommit());
1209         assertEquals(0, appendEntries.getEntries().size());
1210         assertEquals(0, appendEntries.getPrevLogIndex());
1211
1212         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1213                 leaderActor, AppendEntriesReply.class);
1214
1215         assertEquals(2, appendEntriesReply.getLogLastIndex());
1216         assertEquals(1, appendEntriesReply.getLogLastTerm());
1217
1218         // follower returns its next index
1219         assertEquals(2, appendEntriesReply.getLogLastIndex());
1220         assertEquals(1, appendEntriesReply.getLogLastTerm());
1221
1222         follower.close();
1223     }
1224
1225     @Test
1226     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1227         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1228
1229         MockRaftActorContext leaderActorContext = createActorContext();
1230
1231         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1232         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1233
1234         Follower follower = new Follower(followerActorContext);
1235         followerActor.underlyingActor().setBehavior(follower);
1236
1237         Map<String, String> leaderPeerAddresses = new HashMap<>();
1238         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1239
1240         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1241
1242         leaderActorContext.getReplicatedLog().removeFrom(0);
1243
1244         leaderActorContext.setReplicatedLog(
1245                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1246
1247         leaderActorContext.setCommitIndex(1);
1248
1249         followerActorContext.getReplicatedLog().removeFrom(0);
1250
1251         followerActorContext.setReplicatedLog(
1252                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1253
1254         // follower has the same log entries but its commit index > leaders commit index
1255         followerActorContext.setCommitIndex(2);
1256
1257         leader = new Leader(leaderActorContext);
1258
1259         // Initial heartbeat
1260         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1261
1262         assertEquals(1, appendEntries.getLeaderCommit());
1263         assertEquals(0, appendEntries.getEntries().size());
1264         assertEquals(0, appendEntries.getPrevLogIndex());
1265
1266         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1267                 leaderActor, AppendEntriesReply.class);
1268
1269         assertEquals(2, appendEntriesReply.getLogLastIndex());
1270         assertEquals(1, appendEntriesReply.getLogLastTerm());
1271
1272         leaderActor.underlyingActor().setBehavior(follower);
1273         leader.handleMessage(followerActor, appendEntriesReply);
1274
1275         leaderActor.underlyingActor().clear();
1276         followerActor.underlyingActor().clear();
1277
1278         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1279                 TimeUnit.MILLISECONDS);
1280
1281         leader.handleMessage(leaderActor, new SendHeartBeat());
1282
1283         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1284
1285         assertEquals(2, appendEntries.getLeaderCommit());
1286         assertEquals(0, appendEntries.getEntries().size());
1287         assertEquals(2, appendEntries.getPrevLogIndex());
1288
1289         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1290
1291         assertEquals(2, appendEntriesReply.getLogLastIndex());
1292         assertEquals(1, appendEntriesReply.getLogLastTerm());
1293
1294         assertEquals(2, followerActorContext.getCommitIndex());
1295
1296         follower.close();
1297     }
1298
1299     @Test
1300     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1301         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1302
1303         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1304         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1305                 new FiniteDuration(1000, TimeUnit.SECONDS));
1306
1307         leaderActorContext.setReplicatedLog(
1308                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309         long leaderCommitIndex = 2;
1310         leaderActorContext.setCommitIndex(leaderCommitIndex);
1311         leaderActorContext.setLastApplied(leaderCommitIndex);
1312
1313         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1314         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1315
1316         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1317
1318         followerActorContext.setReplicatedLog(
1319                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1320         followerActorContext.setCommitIndex(0);
1321         followerActorContext.setLastApplied(0);
1322
1323         Follower follower = new Follower(followerActorContext);
1324         followerActor.underlyingActor().setBehavior(follower);
1325
1326         leader = new Leader(leaderActorContext);
1327
1328         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1329         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1330
1331         MessageCollectorActor.clearMessages(followerActor);
1332         MessageCollectorActor.clearMessages(leaderActor);
1333
1334         // Verify initial AppendEntries sent with the leader's current commit index.
1335         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1336         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1337         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1338
1339         leaderActor.underlyingActor().setBehavior(leader);
1340
1341         leader.handleMessage(followerActor, appendEntriesReply);
1342
1343         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1344         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1345
1346         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1347         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1348         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1349
1350         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1351         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1352                 appendEntries.getEntries().get(0).getData());
1353         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1354         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1355                 appendEntries.getEntries().get(1).getData());
1356
1357         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1358         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1359
1360         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1361
1362         ApplyState applyState = applyStateList.get(0);
1363         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1364         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1365         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1366                 applyState.getReplicatedLogEntry().getData());
1367
1368         applyState = applyStateList.get(1);
1369         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1370         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1371         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1372                 applyState.getReplicatedLogEntry().getData());
1373
1374         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1375         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1376     }
1377
1378     @Test
1379     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1380         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1381
1382         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1383         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1384                 new FiniteDuration(1000, TimeUnit.SECONDS));
1385
1386         leaderActorContext.setReplicatedLog(
1387                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1388         long leaderCommitIndex = 1;
1389         leaderActorContext.setCommitIndex(leaderCommitIndex);
1390         leaderActorContext.setLastApplied(leaderCommitIndex);
1391
1392         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1393         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1394
1395         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1396
1397         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1398         followerActorContext.setCommitIndex(-1);
1399         followerActorContext.setLastApplied(-1);
1400
1401         Follower follower = new Follower(followerActorContext);
1402         followerActor.underlyingActor().setBehavior(follower);
1403
1404         leader = new Leader(leaderActorContext);
1405
1406         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1407         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1408
1409         MessageCollectorActor.clearMessages(followerActor);
1410         MessageCollectorActor.clearMessages(leaderActor);
1411
1412         // Verify initial AppendEntries sent with the leader's current commit index.
1413         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1414         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1415         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1416
1417         leaderActor.underlyingActor().setBehavior(leader);
1418
1419         leader.handleMessage(followerActor, appendEntriesReply);
1420
1421         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1422         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1423
1424         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1425         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1426         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1427
1428         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1429         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1430                 appendEntries.getEntries().get(0).getData());
1431         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1432         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1433                 appendEntries.getEntries().get(1).getData());
1434
1435         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1436         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1437
1438         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1439
1440         ApplyState applyState = applyStateList.get(0);
1441         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1442         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1443         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1444                 applyState.getReplicatedLogEntry().getData());
1445
1446         applyState = applyStateList.get(1);
1447         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1448         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1449         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1450                 applyState.getReplicatedLogEntry().getData());
1451
1452         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1453         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1454     }
1455
1456     @Test
1457     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1458         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1459
1460         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1461         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1462                 new FiniteDuration(1000, TimeUnit.SECONDS));
1463
1464         leaderActorContext.setReplicatedLog(
1465                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1466         long leaderCommitIndex = 1;
1467         leaderActorContext.setCommitIndex(leaderCommitIndex);
1468         leaderActorContext.setLastApplied(leaderCommitIndex);
1469
1470         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1471         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1472
1473         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1474
1475         followerActorContext.setReplicatedLog(
1476                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1477         followerActorContext.setCommitIndex(-1);
1478         followerActorContext.setLastApplied(-1);
1479
1480         Follower follower = new Follower(followerActorContext);
1481         followerActor.underlyingActor().setBehavior(follower);
1482
1483         leader = new Leader(leaderActorContext);
1484
1485         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1486         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1487
1488         MessageCollectorActor.clearMessages(followerActor);
1489         MessageCollectorActor.clearMessages(leaderActor);
1490
1491         // Verify initial AppendEntries sent with the leader's current commit index.
1492         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1493         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1494         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1495
1496         leaderActor.underlyingActor().setBehavior(leader);
1497
1498         leader.handleMessage(followerActor, appendEntriesReply);
1499
1500         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1501         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1502
1503         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1504         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1505         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1506
1507         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1508         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1509         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1510                 appendEntries.getEntries().get(0).getData());
1511         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1512         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1513         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1514                 appendEntries.getEntries().get(1).getData());
1515
1516         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1517         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1518
1519         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1520
1521         ApplyState applyState = applyStateList.get(0);
1522         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1523         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1524         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1525                 applyState.getReplicatedLogEntry().getData());
1526
1527         applyState = applyStateList.get(1);
1528         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1529         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1530         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1531                 applyState.getReplicatedLogEntry().getData());
1532
1533         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1534         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1535         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1536     }
1537
1538     @Test
1539     public void testHandleAppendEntriesReplyWithNewerTerm(){
1540         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1541
1542         MockRaftActorContext leaderActorContext = createActorContext();
1543         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1544                 new FiniteDuration(10000, TimeUnit.SECONDS));
1545
1546         leaderActorContext.setReplicatedLog(
1547                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1548
1549         leader = new Leader(leaderActorContext);
1550         leaderActor.underlyingActor().setBehavior(leader);
1551         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1552
1553         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1554
1555         assertEquals(false, appendEntriesReply.isSuccess());
1556         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1557
1558         MessageCollectorActor.clearMessages(leaderActor);
1559     }
1560
1561     @Test
1562     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1563         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1564
1565         MockRaftActorContext leaderActorContext = createActorContext();
1566         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1567                 new FiniteDuration(10000, TimeUnit.SECONDS));
1568
1569         leaderActorContext.setReplicatedLog(
1570                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1571         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1572
1573         leader = new Leader(leaderActorContext);
1574         leaderActor.underlyingActor().setBehavior(leader);
1575         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1576
1577         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1578
1579         assertEquals(false, appendEntriesReply.isSuccess());
1580         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1581
1582         MessageCollectorActor.clearMessages(leaderActor);
1583     }
1584
1585     @Test
1586     public void testHandleAppendEntriesReplySuccess() throws Exception {
1587         logStart("testHandleAppendEntriesReplySuccess");
1588
1589         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1590
1591         leaderActorContext.setReplicatedLog(
1592                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1593
1594         leaderActorContext.setCommitIndex(1);
1595         leaderActorContext.setLastApplied(1);
1596         leaderActorContext.getTermInformation().update(1, "leader");
1597
1598         leader = new Leader(leaderActorContext);
1599
1600         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1601
1602         short payloadVersion = 5;
1603         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1604
1605         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1606
1607         assertEquals(RaftState.Leader, raftActorBehavior.state());
1608
1609         assertEquals(2, leaderActorContext.getCommitIndex());
1610
1611         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1612                 leaderActor, ApplyJournalEntries.class);
1613
1614         assertEquals(2, leaderActorContext.getLastApplied());
1615
1616         assertEquals(2, applyJournalEntries.getToIndex());
1617
1618         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1619                 ApplyState.class);
1620
1621         assertEquals(1,applyStateList.size());
1622
1623         ApplyState applyState = applyStateList.get(0);
1624
1625         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1626
1627         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1628         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1629     }
1630
1631     @Test
1632     public void testHandleAppendEntriesReplyUnknownFollower(){
1633         logStart("testHandleAppendEntriesReplyUnknownFollower");
1634
1635         MockRaftActorContext leaderActorContext = createActorContext();
1636
1637         leader = new Leader(leaderActorContext);
1638
1639         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1640
1641         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1642
1643         assertEquals(RaftState.Leader, raftActorBehavior.state());
1644     }
1645
1646     @Test
1647     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1648         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1649
1650         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1651         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1652                 new FiniteDuration(1000, TimeUnit.SECONDS));
1653         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1654
1655         leaderActorContext.setReplicatedLog(
1656                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1657         long leaderCommitIndex = 3;
1658         leaderActorContext.setCommitIndex(leaderCommitIndex);
1659         leaderActorContext.setLastApplied(leaderCommitIndex);
1660
1661         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1662         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1663         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1664         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1665
1666         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1667
1668         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1669         followerActorContext.setCommitIndex(-1);
1670         followerActorContext.setLastApplied(-1);
1671
1672         Follower follower = new Follower(followerActorContext);
1673         followerActor.underlyingActor().setBehavior(follower);
1674
1675         leader = new Leader(leaderActorContext);
1676
1677         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1678         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1679
1680         MessageCollectorActor.clearMessages(followerActor);
1681         MessageCollectorActor.clearMessages(leaderActor);
1682
1683         // Verify initial AppendEntries sent with the leader's current commit index.
1684         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1685         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1686         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1687
1688         leaderActor.underlyingActor().setBehavior(leader);
1689
1690         leader.handleMessage(followerActor, appendEntriesReply);
1691
1692         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1693         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1694
1695         appendEntries = appendEntriesList.get(0);
1696         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1697         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1698         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1699
1700         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1701         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1702                 appendEntries.getEntries().get(0).getData());
1703         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1704         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1705                 appendEntries.getEntries().get(1).getData());
1706
1707         appendEntries = appendEntriesList.get(1);
1708         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1709         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1710         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1711
1712         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1713         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1714                 appendEntries.getEntries().get(0).getData());
1715         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1716         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1717                 appendEntries.getEntries().get(1).getData());
1718
1719         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1720         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1721
1722         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1723
1724         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1725         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1726     }
1727
1728     @Test
1729     public void testHandleRequestVoteReply(){
1730         logStart("testHandleRequestVoteReply");
1731
1732         MockRaftActorContext leaderActorContext = createActorContext();
1733
1734         leader = new Leader(leaderActorContext);
1735
1736         // Should be a no-op.
1737         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1738                 new RequestVoteReply(1, true));
1739
1740         assertEquals(RaftState.Leader, raftActorBehavior.state());
1741
1742         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1743
1744         assertEquals(RaftState.Leader, raftActorBehavior.state());
1745     }
1746
1747     @Test
1748     public void testIsolatedLeaderCheckNoFollowers() {
1749         logStart("testIsolatedLeaderCheckNoFollowers");
1750
1751         MockRaftActorContext leaderActorContext = createActorContext();
1752
1753         leader = new Leader(leaderActorContext);
1754         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1755         Assert.assertTrue(behavior instanceof Leader);
1756     }
1757
1758     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1759         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1760         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1761
1762         MockRaftActorContext leaderActorContext = createActorContext();
1763
1764         Map<String, String> peerAddresses = new HashMap<>();
1765         peerAddresses.put("follower-1", followerActor1.path().toString());
1766         peerAddresses.put("follower-2", followerActor2.path().toString());
1767
1768         leaderActorContext.setPeerAddresses(peerAddresses);
1769         leaderActorContext.setRaftPolicy(raftPolicy);
1770
1771         leader = new Leader(leaderActorContext);
1772
1773         leader.markFollowerActive("follower-1");
1774         leader.markFollowerActive("follower-2");
1775         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1776         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1777                 behavior instanceof Leader);
1778
1779         // kill 1 follower and verify if that got killed
1780         final JavaTestKit probe = new JavaTestKit(getSystem());
1781         probe.watch(followerActor1);
1782         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1783         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1784         assertEquals(termMsg1.getActor(), followerActor1);
1785
1786         leader.markFollowerInActive("follower-1");
1787         leader.markFollowerActive("follower-2");
1788         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1789         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1790                 behavior instanceof Leader);
1791
1792         // kill 2nd follower and leader should change to Isolated leader
1793         followerActor2.tell(PoisonPill.getInstance(), null);
1794         probe.watch(followerActor2);
1795         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1796         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1797         assertEquals(termMsg2.getActor(), followerActor2);
1798
1799         leader.markFollowerInActive("follower-2");
1800         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1801     }
1802
1803     @Test
1804     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1805         logStart("testIsolatedLeaderCheckTwoFollowers");
1806
1807         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1808
1809         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1810             behavior instanceof IsolatedLeader);
1811     }
1812
1813     @Test
1814     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1815         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1816
1817         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1818
1819         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1820                 behavior instanceof Leader);
1821     }
1822
1823     @Test
1824     public void testLaggingFollowerStarvation() throws Exception {
1825         logStart("testLaggingFollowerStarvation");
1826         new JavaTestKit(getSystem()) {{
1827             String leaderActorId = actorFactory.generateActorId("leader");
1828             String follower1ActorId = actorFactory.generateActorId("follower");
1829             String follower2ActorId = actorFactory.generateActorId("follower");
1830
1831             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1832                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1833             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1834             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1835
1836             MockRaftActorContext leaderActorContext =
1837                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1838
1839             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1840             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1841             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1842
1843             leaderActorContext.setConfigParams(configParams);
1844
1845             leaderActorContext.setReplicatedLog(
1846                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1847
1848             Map<String, String> peerAddresses = new HashMap<>();
1849             peerAddresses.put(follower1ActorId,
1850                     follower1Actor.path().toString());
1851             peerAddresses.put(follower2ActorId,
1852                     follower2Actor.path().toString());
1853
1854             leaderActorContext.setPeerAddresses(peerAddresses);
1855             leaderActorContext.getTermInformation().update(1, leaderActorId);
1856
1857             RaftActorBehavior leader = createBehavior(leaderActorContext);
1858
1859             leaderActor.underlyingActor().setBehavior(leader);
1860
1861             for(int i=1;i<6;i++) {
1862                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1863                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1864                 assertTrue(newBehavior == leader);
1865                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1866             }
1867
1868             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1869             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1870
1871             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1872                     heartbeats.size() > 1);
1873
1874             // Check if follower-2 got AppendEntries during this time and was not starved
1875             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1876
1877             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1878                     appendEntries.size() > 1);
1879
1880         }};
1881     }
1882
1883     @Test
1884     public void testReplicationConsensusWithNonVotingFollower() {
1885         logStart("testReplicationConsensusWithNonVotingFollower");
1886
1887         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1888         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1889                 new FiniteDuration(1000, TimeUnit.SECONDS));
1890
1891         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1892
1893         String nonVotingFollowerId = "nonvoting-follower";
1894         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1895                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1896
1897         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1898
1899         leader = new Leader(leaderActorContext);
1900
1901         // Ignore initial heartbeats
1902         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1903         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1904
1905         MessageCollectorActor.clearMessages(followerActor);
1906         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1907         MessageCollectorActor.clearMessages(leaderActor);
1908
1909         // Send a Replicate message and wait for AppendEntries.
1910         sendReplicate(leaderActorContext, 0);
1911
1912         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1913         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1914
1915         // Send reply only from the voting follower and verify consensus via ApplyState.
1916         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1917
1918         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1919
1920         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1921
1922         MessageCollectorActor.clearMessages(followerActor);
1923         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1924         MessageCollectorActor.clearMessages(leaderActor);
1925
1926         // Send another Replicate message
1927         sendReplicate(leaderActorContext, 1);
1928
1929         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1930         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1931                 AppendEntries.class);
1932         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1933         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1934
1935         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1936         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1937
1938         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
1939
1940         // Send reply from the voting follower and verify consensus.
1941         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1942
1943         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1944     }
1945
1946     @Override
1947     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1948             ActorRef actorRef, RaftRPC rpc) throws Exception {
1949         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1950         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1951     }
1952
1953     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1954
1955         private final long electionTimeOutIntervalMillis;
1956         private final int snapshotChunkSize;
1957
1958         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1959             super();
1960             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1961             this.snapshotChunkSize = snapshotChunkSize;
1962         }
1963
1964         @Override
1965         public FiniteDuration getElectionTimeOutInterval() {
1966             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1967         }
1968
1969         @Override
1970         public int getSnapshotChunkSize() {
1971             return snapshotChunkSize;
1972         }
1973     }
1974 }