Bug-4214 - Add support for configurable snapshot chunk size.
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
55 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
56 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import scala.concurrent.duration.FiniteDuration;
59
60 public class LeaderTest extends AbstractLeaderTest {
61
62     static final String FOLLOWER_ID = "follower";
63     public static final String LEADER_ID = "leader";
64
65     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
66             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
67
68     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
69             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
70
71     private Leader leader;
72     private final short payloadVersion = 5;
73
74     @Override
75     @After
76     public void tearDown() throws Exception {
77         if(leader != null) {
78             leader.close();
79         }
80
81         super.tearDown();
82     }
83
84     @Test
85     public void testHandleMessageForUnknownMessage() throws Exception {
86         logStart("testHandleMessageForUnknownMessage");
87
88         leader = new Leader(createActorContext());
89
90         // handle message should return the Leader state when it receives an
91         // unknown message
92         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
93         Assert.assertTrue(behavior instanceof Leader);
94     }
95
96     @Test
97     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
98         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
99
100         MockRaftActorContext actorContext = createActorContextWithFollower();
101         short payloadVersion = (short)5;
102         actorContext.setPayloadVersion(payloadVersion);
103
104         long term = 1;
105         actorContext.getTermInformation().update(term, "");
106
107         leader = new Leader(actorContext);
108
109         // Leader should send an immediate heartbeat with no entries as follower is inactive.
110         long lastIndex = actorContext.getReplicatedLog().lastIndex();
111         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
112         assertEquals("getTerm", term, appendEntries.getTerm());
113         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
114         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
115         assertEquals("Entries size", 0, appendEntries.getEntries().size());
116         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
117
118         // The follower would normally reply - simulate that explicitly here.
119         leader.handleMessage(followerActor, new AppendEntriesReply(
120                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
121         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
122
123         followerActor.underlyingActor().clear();
124
125         // Sleep for the heartbeat interval so AppendEntries is sent.
126         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
127                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
128
129         leader.handleMessage(leaderActor, new SendHeartBeat());
130
131         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
132         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
133         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
134         assertEquals("Entries size", 1, appendEntries.getEntries().size());
135         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
136         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
137         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
138     }
139
140
141     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
142         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
143         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
144                 1, index, payload);
145         actorContext.getReplicatedLog().append(newEntry);
146         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
147     }
148
149     @Test
150     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
151         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
152
153         MockRaftActorContext actorContext = createActorContextWithFollower();
154
155         long term = 1;
156         actorContext.getTermInformation().update(term, "");
157
158         leader = new Leader(actorContext);
159
160         // Leader will send an immediate heartbeat - ignore it.
161         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
162
163         // The follower would normally reply - simulate that explicitly here.
164         long lastIndex = actorContext.getReplicatedLog().lastIndex();
165         leader.handleMessage(followerActor, new AppendEntriesReply(
166                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
167         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
168
169         followerActor.underlyingActor().clear();
170
171         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
172
173         // State should not change
174         assertTrue(raftBehavior instanceof Leader);
175
176         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
177         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
178         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
179         assertEquals("Entries size", 1, appendEntries.getEntries().size());
180         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
181         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
182         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
183         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
184     }
185
186     @Test
187     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
188         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
189
190         MockRaftActorContext actorContext = createActorContextWithFollower();
191         actorContext.setRaftPolicy(createRaftPolicy(true, true));
192
193         long term = 1;
194         actorContext.getTermInformation().update(term, "");
195
196         leader = new Leader(actorContext);
197
198         // Leader will send an immediate heartbeat - ignore it.
199         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
200
201         // The follower would normally reply - simulate that explicitly here.
202         long lastIndex = actorContext.getReplicatedLog().lastIndex();
203         leader.handleMessage(followerActor, new AppendEntriesReply(
204                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
205         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
206
207         followerActor.underlyingActor().clear();
208
209         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
210
211         // State should not change
212         assertTrue(raftBehavior instanceof Leader);
213
214         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
216         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
217         assertEquals("Entries size", 1, appendEntries.getEntries().size());
218         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
219         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
220         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
221         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
222     }
223
224     @Test
225     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
226         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
227
228         MockRaftActorContext actorContext = createActorContextWithFollower();
229         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
230             @Override
231             public FiniteDuration getHeartBeatInterval() {
232                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
233             }
234         });
235
236         long term = 1;
237         actorContext.getTermInformation().update(term, "");
238
239         leader = new Leader(actorContext);
240
241         // Leader will send an immediate heartbeat - ignore it.
242         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
243
244         // The follower would normally reply - simulate that explicitly here.
245         long lastIndex = actorContext.getReplicatedLog().lastIndex();
246         leader.handleMessage(followerActor, new AppendEntriesReply(
247                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
248         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
249
250         followerActor.underlyingActor().clear();
251
252         for(int i=0;i<5;i++) {
253             sendReplicate(actorContext, lastIndex+i+1);
254         }
255
256         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
257         // We expect only 1 message to be sent because of two reasons,
258         // - an append entries reply was not received
259         // - the heartbeat interval has not expired
260         // In this scenario if multiple messages are sent they would likely be duplicates
261         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
262     }
263
264     @Test
265     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
266         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
267
268         MockRaftActorContext actorContext = createActorContextWithFollower();
269         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
270             @Override
271             public FiniteDuration getHeartBeatInterval() {
272                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
273             }
274         });
275
276         long term = 1;
277         actorContext.getTermInformation().update(term, "");
278
279         leader = new Leader(actorContext);
280
281         // Leader will send an immediate heartbeat - ignore it.
282         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283
284         // The follower would normally reply - simulate that explicitly here.
285         long lastIndex = actorContext.getReplicatedLog().lastIndex();
286         leader.handleMessage(followerActor, new AppendEntriesReply(
287                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
288         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289
290         followerActor.underlyingActor().clear();
291
292         for(int i=0;i<3;i++) {
293             sendReplicate(actorContext, lastIndex+i+1);
294             leader.handleMessage(followerActor, new AppendEntriesReply(
295                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
296
297         }
298
299         for(int i=3;i<5;i++) {
300             sendReplicate(actorContext, lastIndex + i + 1);
301         }
302
303         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
304         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
305         // get sent to the follower - but not the 5th
306         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
307
308         for(int i=0;i<4;i++) {
309             long expected = allMessages.get(i).getEntries().get(0).getIndex();
310             assertEquals(expected, i+2);
311         }
312     }
313
314     @Test
315     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
316         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
317
318         MockRaftActorContext actorContext = createActorContextWithFollower();
319         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
320             @Override
321             public FiniteDuration getHeartBeatInterval() {
322                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
323             }
324         });
325
326         long term = 1;
327         actorContext.getTermInformation().update(term, "");
328
329         leader = new Leader(actorContext);
330
331         // Leader will send an immediate heartbeat - ignore it.
332         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
333
334         // The follower would normally reply - simulate that explicitly here.
335         long lastIndex = actorContext.getReplicatedLog().lastIndex();
336         leader.handleMessage(followerActor, new AppendEntriesReply(
337                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
338         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
339
340         followerActor.underlyingActor().clear();
341
342         sendReplicate(actorContext, lastIndex+1);
343
344         // Wait slightly longer than heartbeat duration
345         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
346
347         leader.handleMessage(leaderActor, new SendHeartBeat());
348
349         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
350         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
351
352         assertEquals(1, allMessages.get(0).getEntries().size());
353         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
354         assertEquals(1, allMessages.get(1).getEntries().size());
355         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
356
357     }
358
359     @Test
360     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
361         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
362
363         MockRaftActorContext actorContext = createActorContextWithFollower();
364         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
365             @Override
366             public FiniteDuration getHeartBeatInterval() {
367                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
368             }
369         });
370
371         long term = 1;
372         actorContext.getTermInformation().update(term, "");
373
374         leader = new Leader(actorContext);
375
376         // Leader will send an immediate heartbeat - ignore it.
377         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
378
379         // The follower would normally reply - simulate that explicitly here.
380         long lastIndex = actorContext.getReplicatedLog().lastIndex();
381         leader.handleMessage(followerActor, new AppendEntriesReply(
382                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
383         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
384
385         followerActor.underlyingActor().clear();
386
387         for(int i=0;i<3;i++) {
388             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
389             leader.handleMessage(leaderActor, new SendHeartBeat());
390         }
391
392         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
394     }
395
396     @Test
397     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
398         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
399
400         MockRaftActorContext actorContext = createActorContextWithFollower();
401         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
402             @Override
403             public FiniteDuration getHeartBeatInterval() {
404                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
405             }
406         });
407
408         long term = 1;
409         actorContext.getTermInformation().update(term, "");
410
411         leader = new Leader(actorContext);
412
413         // Leader will send an immediate heartbeat - ignore it.
414         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
415
416         // The follower would normally reply - simulate that explicitly here.
417         long lastIndex = actorContext.getReplicatedLog().lastIndex();
418         leader.handleMessage(followerActor, new AppendEntriesReply(
419                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
421
422         followerActor.underlyingActor().clear();
423
424         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
425         leader.handleMessage(leaderActor, new SendHeartBeat());
426         sendReplicate(actorContext, lastIndex+1);
427
428         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
429         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
430
431         assertEquals(0, allMessages.get(0).getEntries().size());
432         assertEquals(1, allMessages.get(1).getEntries().size());
433     }
434
435
436     @Test
437     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
438         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
439
440         MockRaftActorContext actorContext = createActorContext();
441
442         leader = new Leader(actorContext);
443
444         actorContext.setLastApplied(0);
445
446         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
447         long term = actorContext.getTermInformation().getCurrentTerm();
448         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
449                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
450
451         actorContext.getReplicatedLog().append(newEntry);
452
453         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
454                 new Replicate(leaderActor, "state-id", newEntry));
455
456         // State should not change
457         assertTrue(raftBehavior instanceof Leader);
458
459         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
460
461         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
462         // one since lastApplied state is 0.
463         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
464                 leaderActor, ApplyState.class);
465         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
466
467         for(int i = 0; i <= newLogIndex - 1; i++ ) {
468             ApplyState applyState = applyStateList.get(i);
469             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
470             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
471         }
472
473         ApplyState last = applyStateList.get((int) newLogIndex - 1);
474         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
475         assertEquals("getIdentifier", "state-id", last.getIdentifier());
476     }
477
478     @Test
479     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
480         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
481
482         MockRaftActorContext actorContext = createActorContextWithFollower();
483
484         Map<String, String> leadersSnapshot = new HashMap<>();
485         leadersSnapshot.put("1", "A");
486         leadersSnapshot.put("2", "B");
487         leadersSnapshot.put("3", "C");
488
489         //clears leaders log
490         actorContext.getReplicatedLog().removeFrom(0);
491
492         final int commitIndex = 3;
493         final int snapshotIndex = 2;
494         final int newEntryIndex = 4;
495         final int snapshotTerm = 1;
496         final int currentTerm = 2;
497
498         // set the snapshot variables in replicatedlog
499         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
500         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
501         actorContext.setCommitIndex(commitIndex);
502         //set follower timeout to 2 mins, helps during debugging
503         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
504
505         leader = new Leader(actorContext);
506
507         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
508         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
509
510         // new entry
511         ReplicatedLogImplEntry entry =
512                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
513                         new MockRaftActorContext.MockPayload("D"));
514
515         //update follower timestamp
516         leader.markFollowerActive(FOLLOWER_ID);
517
518         ByteString bs = toByteString(leadersSnapshot);
519         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
520                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
521         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
522         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
523
524         //send first chunk and no InstallSnapshotReply received yet
525         fts.getNextChunk();
526         fts.incrementChunkIndex();
527
528         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
529                 TimeUnit.MILLISECONDS);
530
531         leader.handleMessage(leaderActor, new SendHeartBeat());
532
533         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
534
535         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
536
537         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
538
539         //InstallSnapshotReply received
540         fts.markSendStatus(true);
541
542         leader.handleMessage(leaderActor, new SendHeartBeat());
543
544         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
545
546         assertEquals(commitIndex, is.getLastIncludedIndex());
547     }
548
549     @Test
550     public void testSendAppendEntriesSnapshotScenario() throws Exception {
551         logStart("testSendAppendEntriesSnapshotScenario");
552
553         MockRaftActorContext actorContext = createActorContextWithFollower();
554
555         Map<String, String> leadersSnapshot = new HashMap<>();
556         leadersSnapshot.put("1", "A");
557         leadersSnapshot.put("2", "B");
558         leadersSnapshot.put("3", "C");
559
560         //clears leaders log
561         actorContext.getReplicatedLog().removeFrom(0);
562
563         final int followersLastIndex = 2;
564         final int snapshotIndex = 3;
565         final int newEntryIndex = 4;
566         final int snapshotTerm = 1;
567         final int currentTerm = 2;
568
569         // set the snapshot variables in replicatedlog
570         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572         actorContext.setCommitIndex(followersLastIndex);
573
574         leader = new Leader(actorContext);
575
576         // Leader will send an immediate heartbeat - ignore it.
577         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
578
579         // new entry
580         ReplicatedLogImplEntry entry =
581                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
582                         new MockRaftActorContext.MockPayload("D"));
583
584         actorContext.getReplicatedLog().append(entry);
585
586         //update follower timestamp
587         leader.markFollowerActive(FOLLOWER_ID);
588
589         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
590         RaftActorBehavior raftBehavior = leader.handleMessage(
591                 leaderActor, new Replicate(null, "state-id", entry));
592
593         assertTrue(raftBehavior instanceof Leader);
594
595         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
596     }
597
598     @Test
599     public void testInitiateInstallSnapshot() throws Exception {
600         logStart("testInitiateInstallSnapshot");
601
602         MockRaftActorContext actorContext = createActorContextWithFollower();
603
604         //clears leaders log
605         actorContext.getReplicatedLog().removeFrom(0);
606
607         final int followersLastIndex = 2;
608         final int snapshotIndex = 3;
609         final int newEntryIndex = 4;
610         final int snapshotTerm = 1;
611         final int currentTerm = 2;
612
613         // set the snapshot variables in replicatedlog
614         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
615         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
616         actorContext.setLastApplied(3);
617         actorContext.setCommitIndex(followersLastIndex);
618
619         leader = new Leader(actorContext);
620
621         // Leader will send an immediate heartbeat - ignore it.
622         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
623
624         // set the snapshot as absent and check if capture-snapshot is invoked.
625         leader.setSnapshot(null);
626
627         // new entry
628         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
629                 new MockRaftActorContext.MockPayload("D"));
630
631         actorContext.getReplicatedLog().append(entry);
632
633         //update follower timestamp
634         leader.markFollowerActive(FOLLOWER_ID);
635
636         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
637
638         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
639
640         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
641
642         assertTrue(cs.isInstallSnapshotInitiated());
643         assertEquals(3, cs.getLastAppliedIndex());
644         assertEquals(1, cs.getLastAppliedTerm());
645         assertEquals(4, cs.getLastIndex());
646         assertEquals(2, cs.getLastTerm());
647
648         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
649         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
650
651         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
652     }
653
654     @Test
655     public void testInitiateForceInstallSnapshot() throws Exception {
656         logStart("testInitiateForceInstallSnapshot");
657
658         MockRaftActorContext actorContext = createActorContextWithFollower();
659
660         final int followersLastIndex = 2;
661         final int snapshotIndex = -1;
662         final int newEntryIndex = 4;
663         final int snapshotTerm = -1;
664         final int currentTerm = 2;
665
666         // set the snapshot variables in replicatedlog
667         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
668         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
669         actorContext.setLastApplied(3);
670         actorContext.setCommitIndex(followersLastIndex);
671
672         actorContext.getReplicatedLog().removeFrom(0);
673
674         leader = new Leader(actorContext);
675
676         // Leader will send an immediate heartbeat - ignore it.
677         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
678
679         // set the snapshot as absent and check if capture-snapshot is invoked.
680         leader.setSnapshot(null);
681
682         for(int i=0;i<4;i++) {
683             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
684                     new MockRaftActorContext.MockPayload("X" + i)));
685         }
686
687         // new entry
688         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
689                 new MockRaftActorContext.MockPayload("D"));
690
691         actorContext.getReplicatedLog().append(entry);
692
693         //update follower timestamp
694         leader.markFollowerActive(FOLLOWER_ID);
695
696         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
697         // installed with a SendInstallSnapshot
698         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
699
700         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
701
702         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
703
704         assertTrue(cs.isInstallSnapshotInitiated());
705         assertEquals(3, cs.getLastAppliedIndex());
706         assertEquals(1, cs.getLastAppliedTerm());
707         assertEquals(4, cs.getLastIndex());
708         assertEquals(2, cs.getLastTerm());
709
710         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
711         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
712
713         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
714     }
715
716
717     @Test
718     public void testInstallSnapshot() throws Exception {
719         logStart("testInstallSnapshot");
720
721         MockRaftActorContext actorContext = createActorContextWithFollower();
722
723         Map<String, String> leadersSnapshot = new HashMap<>();
724         leadersSnapshot.put("1", "A");
725         leadersSnapshot.put("2", "B");
726         leadersSnapshot.put("3", "C");
727
728         //clears leaders log
729         actorContext.getReplicatedLog().removeFrom(0);
730
731         final int lastAppliedIndex = 3;
732         final int snapshotIndex = 2;
733         final int snapshotTerm = 1;
734         final int currentTerm = 2;
735
736         // set the snapshot variables in replicatedlog
737         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
738         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
739         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
740         actorContext.setCommitIndex(lastAppliedIndex);
741         actorContext.setLastApplied(lastAppliedIndex);
742
743         leader = new Leader(actorContext);
744
745         // Initial heartbeat.
746         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
747
748         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
749         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
750
751         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
752                 Collections.<ReplicatedLogEntry>emptyList(),
753                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
754
755         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
756
757         assertTrue(raftBehavior instanceof Leader);
758
759         // check if installsnapshot gets called with the correct values.
760
761         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
762
763         assertNotNull(installSnapshot.getData());
764         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
765         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
766
767         assertEquals(currentTerm, installSnapshot.getTerm());
768     }
769
770     @Test
771     public void testForceInstallSnapshot() throws Exception {
772         logStart("testForceInstallSnapshot");
773
774         MockRaftActorContext actorContext = createActorContextWithFollower();
775
776         Map<String, String> leadersSnapshot = new HashMap<>();
777         leadersSnapshot.put("1", "A");
778         leadersSnapshot.put("2", "B");
779         leadersSnapshot.put("3", "C");
780
781         final int lastAppliedIndex = 3;
782         final int snapshotIndex = -1;
783         final int snapshotTerm = -1;
784         final int currentTerm = 2;
785
786         // set the snapshot variables in replicatedlog
787         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
788         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
789         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
790         actorContext.setCommitIndex(lastAppliedIndex);
791         actorContext.setLastApplied(lastAppliedIndex);
792
793         leader = new Leader(actorContext);
794
795         // Initial heartbeat.
796         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
797
798         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
799         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
800
801         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
802                 Collections.<ReplicatedLogEntry>emptyList(),
803                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
804
805         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
806
807         assertTrue(raftBehavior instanceof Leader);
808
809         // check if installsnapshot gets called with the correct values.
810
811         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
812
813         assertNotNull(installSnapshot.getData());
814         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
815         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
816
817         assertEquals(currentTerm, installSnapshot.getTerm());
818     }
819
820     @Test
821     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
822         logStart("testHandleInstallSnapshotReplyLastChunk");
823
824         MockRaftActorContext actorContext = createActorContextWithFollower();
825
826         final int commitIndex = 3;
827         final int snapshotIndex = 2;
828         final int snapshotTerm = 1;
829         final int currentTerm = 2;
830
831         actorContext.setCommitIndex(commitIndex);
832
833         leader = new Leader(actorContext);
834
835         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
836         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
837
838         // Ignore initial heartbeat.
839         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
840
841         Map<String, String> leadersSnapshot = new HashMap<>();
842         leadersSnapshot.put("1", "A");
843         leadersSnapshot.put("2", "B");
844         leadersSnapshot.put("3", "C");
845
846         // set the snapshot variables in replicatedlog
847
848         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
849         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
850         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
851
852         ByteString bs = toByteString(leadersSnapshot);
853         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
854                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
855         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
856         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
857         while(!fts.isLastChunk(fts.getChunkIndex())) {
858             fts.getNextChunk();
859             fts.incrementChunkIndex();
860         }
861
862         //clears leaders log
863         actorContext.getReplicatedLog().removeFrom(0);
864
865         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
866                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
867
868         assertTrue(raftBehavior instanceof Leader);
869
870         assertEquals(0, leader.followerSnapshotSize());
871         assertEquals(1, leader.followerLogSize());
872         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
873         assertNotNull(fli);
874         assertEquals(commitIndex, fli.getMatchIndex());
875         assertEquals(commitIndex + 1, fli.getNextIndex());
876     }
877
878     @Test
879     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
880         logStart("testSendSnapshotfromInstallSnapshotReply");
881
882         MockRaftActorContext actorContext = createActorContextWithFollower();
883
884         final int commitIndex = 3;
885         final int snapshotIndex = 2;
886         final int snapshotTerm = 1;
887         final int currentTerm = 2;
888
889         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
890             @Override
891             public int getSnapshotChunkSize() {
892                 return 50;
893             }
894         };
895         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
896         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
897
898         actorContext.setConfigParams(configParams);
899         actorContext.setCommitIndex(commitIndex);
900
901         leader = new Leader(actorContext);
902
903         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
904         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
905
906         Map<String, String> leadersSnapshot = new HashMap<>();
907         leadersSnapshot.put("1", "A");
908         leadersSnapshot.put("2", "B");
909         leadersSnapshot.put("3", "C");
910
911         // set the snapshot variables in replicatedlog
912         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
913         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
914         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
915
916         ByteString bs = toByteString(leadersSnapshot);
917         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
918                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
919         leader.setSnapshot(snapshot);
920
921         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
922
923         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
924
925         assertEquals(1, installSnapshot.getChunkIndex());
926         assertEquals(3, installSnapshot.getTotalChunks());
927
928         followerActor.underlyingActor().clear();
929         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
930                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
931
932         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
933
934         assertEquals(2, installSnapshot.getChunkIndex());
935         assertEquals(3, installSnapshot.getTotalChunks());
936
937         followerActor.underlyingActor().clear();
938         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
939                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
940
941         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
942
943         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
944         followerActor.underlyingActor().clear();
945         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
946                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
947
948         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
949
950         Assert.assertNull(installSnapshot);
951     }
952
953
954     @Test
955     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
956         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
957
958         MockRaftActorContext actorContext = createActorContextWithFollower();
959
960         final int commitIndex = 3;
961         final int snapshotIndex = 2;
962         final int snapshotTerm = 1;
963         final int currentTerm = 2;
964
965         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
966             @Override
967             public int getSnapshotChunkSize() {
968                 return 50;
969             }
970         });
971
972         actorContext.setCommitIndex(commitIndex);
973
974         leader = new Leader(actorContext);
975
976         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
977         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
978
979         Map<String, String> leadersSnapshot = new HashMap<>();
980         leadersSnapshot.put("1", "A");
981         leadersSnapshot.put("2", "B");
982         leadersSnapshot.put("3", "C");
983
984         // set the snapshot variables in replicatedlog
985         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
986         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
987         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
988
989         ByteString bs = toByteString(leadersSnapshot);
990         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
991                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
992         leader.setSnapshot(snapshot);
993
994         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
995         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
996
997         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
998
999         assertEquals(1, installSnapshot.getChunkIndex());
1000         assertEquals(3, installSnapshot.getTotalChunks());
1001
1002         followerActor.underlyingActor().clear();
1003
1004         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1005                 FOLLOWER_ID, -1, false));
1006
1007         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1008                 TimeUnit.MILLISECONDS);
1009
1010         leader.handleMessage(leaderActor, new SendHeartBeat());
1011
1012         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1013
1014         assertEquals(1, installSnapshot.getChunkIndex());
1015         assertEquals(3, installSnapshot.getTotalChunks());
1016     }
1017
1018     @Test
1019     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1020         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1021
1022         MockRaftActorContext actorContext = createActorContextWithFollower();
1023
1024         final int commitIndex = 3;
1025         final int snapshotIndex = 2;
1026         final int snapshotTerm = 1;
1027         final int currentTerm = 2;
1028
1029         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1030             @Override
1031             public int getSnapshotChunkSize() {
1032                 return 50;
1033             }
1034         });
1035
1036         actorContext.setCommitIndex(commitIndex);
1037
1038         leader = new Leader(actorContext);
1039
1040         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1041         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1042
1043         Map<String, String> leadersSnapshot = new HashMap<>();
1044         leadersSnapshot.put("1", "A");
1045         leadersSnapshot.put("2", "B");
1046         leadersSnapshot.put("3", "C");
1047
1048         // set the snapshot variables in replicatedlog
1049         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1050         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1051         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1052
1053         ByteString bs = toByteString(leadersSnapshot);
1054         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1055                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1056         leader.setSnapshot(snapshot);
1057
1058         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1059
1060         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1061
1062         assertEquals(1, installSnapshot.getChunkIndex());
1063         assertEquals(3, installSnapshot.getTotalChunks());
1064         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1065
1066         int hashCode = installSnapshot.getData().hashCode();
1067
1068         followerActor.underlyingActor().clear();
1069
1070         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1071                 FOLLOWER_ID, 1, true));
1072
1073         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1074
1075         assertEquals(2, installSnapshot.getChunkIndex());
1076         assertEquals(3, installSnapshot.getTotalChunks());
1077         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1078     }
1079
1080     @Test
1081     public void testFollowerToSnapshotLogic() {
1082         logStart("testFollowerToSnapshotLogic");
1083
1084         MockRaftActorContext actorContext = createActorContext();
1085
1086         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1087             @Override
1088             public int getSnapshotChunkSize() {
1089                 return 50;
1090             }
1091         });
1092
1093         leader = new Leader(actorContext);
1094
1095         Map<String, String> leadersSnapshot = new HashMap<>();
1096         leadersSnapshot.put("1", "A");
1097         leadersSnapshot.put("2", "B");
1098         leadersSnapshot.put("3", "C");
1099
1100         ByteString bs = toByteString(leadersSnapshot);
1101         byte[] barray = bs.toByteArray();
1102
1103         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1104         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1105
1106         assertEquals(bs.size(), barray.length);
1107
1108         int chunkIndex=0;
1109         for (int i=0; i < barray.length; i = i + 50) {
1110             int j = i + 50;
1111             chunkIndex++;
1112
1113             if (i + 50 > barray.length) {
1114                 j = barray.length;
1115             }
1116
1117             ByteString chunk = fts.getNextChunk();
1118             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1119             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1120
1121             fts.markSendStatus(true);
1122             if (!fts.isLastChunk(chunkIndex)) {
1123                 fts.incrementChunkIndex();
1124             }
1125         }
1126
1127         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1128     }
1129
1130     @Override protected RaftActorBehavior createBehavior(
1131         RaftActorContext actorContext) {
1132         return new Leader(actorContext);
1133     }
1134
1135     @Override
1136     protected MockRaftActorContext createActorContext() {
1137         return createActorContext(leaderActor);
1138     }
1139
1140     @Override
1141     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1142         return createActorContext(LEADER_ID, actorRef);
1143     }
1144
1145     private MockRaftActorContext createActorContextWithFollower() {
1146         MockRaftActorContext actorContext = createActorContext();
1147         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1148                 followerActor.path().toString()).build());
1149         return actorContext;
1150     }
1151
1152     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1153         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1154         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1155         configParams.setElectionTimeoutFactor(100000);
1156         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1157         context.setConfigParams(configParams);
1158         context.setPayloadVersion(payloadVersion);
1159         return context;
1160     }
1161
1162     private MockRaftActorContext createFollowerActorContextWithLeader() {
1163         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1164         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1165         followerConfig.setElectionTimeoutFactor(10000);
1166         followerActorContext.setConfigParams(followerConfig);
1167         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1168         return followerActorContext;
1169     }
1170
1171     @Test
1172     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1173         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1174
1175         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1176
1177         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1178
1179         Follower follower = new Follower(followerActorContext);
1180         followerActor.underlyingActor().setBehavior(follower);
1181
1182         Map<String, String> peerAddresses = new HashMap<>();
1183         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1184
1185         leaderActorContext.setPeerAddresses(peerAddresses);
1186
1187         leaderActorContext.getReplicatedLog().removeFrom(0);
1188
1189         //create 3 entries
1190         leaderActorContext.setReplicatedLog(
1191                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1192
1193         leaderActorContext.setCommitIndex(1);
1194
1195         followerActorContext.getReplicatedLog().removeFrom(0);
1196
1197         // follower too has the exact same log entries and has the same commit index
1198         followerActorContext.setReplicatedLog(
1199                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1200
1201         followerActorContext.setCommitIndex(1);
1202
1203         leader = new Leader(leaderActorContext);
1204
1205         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1206
1207         assertEquals(1, appendEntries.getLeaderCommit());
1208         assertEquals(0, appendEntries.getEntries().size());
1209         assertEquals(0, appendEntries.getPrevLogIndex());
1210
1211         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1212                 leaderActor, AppendEntriesReply.class);
1213
1214         assertEquals(2, appendEntriesReply.getLogLastIndex());
1215         assertEquals(1, appendEntriesReply.getLogLastTerm());
1216
1217         // follower returns its next index
1218         assertEquals(2, appendEntriesReply.getLogLastIndex());
1219         assertEquals(1, appendEntriesReply.getLogLastTerm());
1220
1221         follower.close();
1222     }
1223
1224     @Test
1225     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1226         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1227
1228         MockRaftActorContext leaderActorContext = createActorContext();
1229
1230         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1231         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1232
1233         Follower follower = new Follower(followerActorContext);
1234         followerActor.underlyingActor().setBehavior(follower);
1235
1236         Map<String, String> leaderPeerAddresses = new HashMap<>();
1237         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1238
1239         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1240
1241         leaderActorContext.getReplicatedLog().removeFrom(0);
1242
1243         leaderActorContext.setReplicatedLog(
1244                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1245
1246         leaderActorContext.setCommitIndex(1);
1247
1248         followerActorContext.getReplicatedLog().removeFrom(0);
1249
1250         followerActorContext.setReplicatedLog(
1251                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1252
1253         // follower has the same log entries but its commit index > leaders commit index
1254         followerActorContext.setCommitIndex(2);
1255
1256         leader = new Leader(leaderActorContext);
1257
1258         // Initial heartbeat
1259         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1260
1261         assertEquals(1, appendEntries.getLeaderCommit());
1262         assertEquals(0, appendEntries.getEntries().size());
1263         assertEquals(0, appendEntries.getPrevLogIndex());
1264
1265         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1266                 leaderActor, AppendEntriesReply.class);
1267
1268         assertEquals(2, appendEntriesReply.getLogLastIndex());
1269         assertEquals(1, appendEntriesReply.getLogLastTerm());
1270
1271         leaderActor.underlyingActor().setBehavior(follower);
1272         leader.handleMessage(followerActor, appendEntriesReply);
1273
1274         leaderActor.underlyingActor().clear();
1275         followerActor.underlyingActor().clear();
1276
1277         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1278                 TimeUnit.MILLISECONDS);
1279
1280         leader.handleMessage(leaderActor, new SendHeartBeat());
1281
1282         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1283
1284         assertEquals(2, appendEntries.getLeaderCommit());
1285         assertEquals(0, appendEntries.getEntries().size());
1286         assertEquals(2, appendEntries.getPrevLogIndex());
1287
1288         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1289
1290         assertEquals(2, appendEntriesReply.getLogLastIndex());
1291         assertEquals(1, appendEntriesReply.getLogLastTerm());
1292
1293         assertEquals(2, followerActorContext.getCommitIndex());
1294
1295         follower.close();
1296     }
1297
1298     @Test
1299     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1300         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1301
1302         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1303         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1304                 new FiniteDuration(1000, TimeUnit.SECONDS));
1305
1306         leaderActorContext.setReplicatedLog(
1307                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1308         long leaderCommitIndex = 2;
1309         leaderActorContext.setCommitIndex(leaderCommitIndex);
1310         leaderActorContext.setLastApplied(leaderCommitIndex);
1311
1312         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1313         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1314
1315         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1316
1317         followerActorContext.setReplicatedLog(
1318                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1319         followerActorContext.setCommitIndex(0);
1320         followerActorContext.setLastApplied(0);
1321
1322         Follower follower = new Follower(followerActorContext);
1323         followerActor.underlyingActor().setBehavior(follower);
1324
1325         leader = new Leader(leaderActorContext);
1326
1327         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1328         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1329
1330         MessageCollectorActor.clearMessages(followerActor);
1331         MessageCollectorActor.clearMessages(leaderActor);
1332
1333         // Verify initial AppendEntries sent with the leader's current commit index.
1334         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1335         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1336         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1337
1338         leaderActor.underlyingActor().setBehavior(leader);
1339
1340         leader.handleMessage(followerActor, appendEntriesReply);
1341
1342         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1343         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1344
1345         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1346         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1347         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1348
1349         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1350         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1351                 appendEntries.getEntries().get(0).getData());
1352         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1353         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1354                 appendEntries.getEntries().get(1).getData());
1355
1356         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1357         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1358
1359         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1360
1361         ApplyState applyState = applyStateList.get(0);
1362         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1363         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1364         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1365                 applyState.getReplicatedLogEntry().getData());
1366
1367         applyState = applyStateList.get(1);
1368         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1369         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1370         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1371                 applyState.getReplicatedLogEntry().getData());
1372
1373         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1374         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1375     }
1376
1377     @Test
1378     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1379         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1380
1381         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1382         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1383                 new FiniteDuration(1000, TimeUnit.SECONDS));
1384
1385         leaderActorContext.setReplicatedLog(
1386                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1387         long leaderCommitIndex = 1;
1388         leaderActorContext.setCommitIndex(leaderCommitIndex);
1389         leaderActorContext.setLastApplied(leaderCommitIndex);
1390
1391         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1392         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1393
1394         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1395
1396         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1397         followerActorContext.setCommitIndex(-1);
1398         followerActorContext.setLastApplied(-1);
1399
1400         Follower follower = new Follower(followerActorContext);
1401         followerActor.underlyingActor().setBehavior(follower);
1402
1403         leader = new Leader(leaderActorContext);
1404
1405         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1406         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1407
1408         MessageCollectorActor.clearMessages(followerActor);
1409         MessageCollectorActor.clearMessages(leaderActor);
1410
1411         // Verify initial AppendEntries sent with the leader's current commit index.
1412         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1413         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1414         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1415
1416         leaderActor.underlyingActor().setBehavior(leader);
1417
1418         leader.handleMessage(followerActor, appendEntriesReply);
1419
1420         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1421         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1422
1423         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1424         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1425         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1426
1427         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1428         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1429                 appendEntries.getEntries().get(0).getData());
1430         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1431         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1432                 appendEntries.getEntries().get(1).getData());
1433
1434         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1435         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1436
1437         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1438
1439         ApplyState applyState = applyStateList.get(0);
1440         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1441         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1442         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1443                 applyState.getReplicatedLogEntry().getData());
1444
1445         applyState = applyStateList.get(1);
1446         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1447         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1448         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1449                 applyState.getReplicatedLogEntry().getData());
1450
1451         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1452         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1453     }
1454
1455     @Test
1456     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1457         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1458
1459         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1460         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1461                 new FiniteDuration(1000, TimeUnit.SECONDS));
1462
1463         leaderActorContext.setReplicatedLog(
1464                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1465         long leaderCommitIndex = 1;
1466         leaderActorContext.setCommitIndex(leaderCommitIndex);
1467         leaderActorContext.setLastApplied(leaderCommitIndex);
1468
1469         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1470         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1471
1472         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1473
1474         followerActorContext.setReplicatedLog(
1475                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1476         followerActorContext.setCommitIndex(-1);
1477         followerActorContext.setLastApplied(-1);
1478
1479         Follower follower = new Follower(followerActorContext);
1480         followerActor.underlyingActor().setBehavior(follower);
1481
1482         leader = new Leader(leaderActorContext);
1483
1484         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1485         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1486
1487         MessageCollectorActor.clearMessages(followerActor);
1488         MessageCollectorActor.clearMessages(leaderActor);
1489
1490         // Verify initial AppendEntries sent with the leader's current commit index.
1491         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1492         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1493         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1494
1495         leaderActor.underlyingActor().setBehavior(leader);
1496
1497         leader.handleMessage(followerActor, appendEntriesReply);
1498
1499         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1500         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1501
1502         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1503         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1504         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1505
1506         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1507         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1508         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1509                 appendEntries.getEntries().get(0).getData());
1510         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1511         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1512         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1513                 appendEntries.getEntries().get(1).getData());
1514
1515         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1516         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1517
1518         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1519
1520         ApplyState applyState = applyStateList.get(0);
1521         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1522         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1523         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1524                 applyState.getReplicatedLogEntry().getData());
1525
1526         applyState = applyStateList.get(1);
1527         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1528         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1529         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1530                 applyState.getReplicatedLogEntry().getData());
1531
1532         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1533         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1534         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1535     }
1536
1537     @Test
1538     public void testHandleAppendEntriesReplyWithNewerTerm(){
1539         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1540
1541         MockRaftActorContext leaderActorContext = createActorContext();
1542         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1543                 new FiniteDuration(10000, TimeUnit.SECONDS));
1544
1545         leaderActorContext.setReplicatedLog(
1546                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1547
1548         leader = new Leader(leaderActorContext);
1549         leaderActor.underlyingActor().setBehavior(leader);
1550         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1551
1552         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1553
1554         assertEquals(false, appendEntriesReply.isSuccess());
1555         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1556
1557         MessageCollectorActor.clearMessages(leaderActor);
1558     }
1559
1560     @Test
1561     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1562         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1563
1564         MockRaftActorContext leaderActorContext = createActorContext();
1565         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1566                 new FiniteDuration(10000, TimeUnit.SECONDS));
1567
1568         leaderActorContext.setReplicatedLog(
1569                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1570         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1571
1572         leader = new Leader(leaderActorContext);
1573         leaderActor.underlyingActor().setBehavior(leader);
1574         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1575
1576         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1577
1578         assertEquals(false, appendEntriesReply.isSuccess());
1579         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1580
1581         MessageCollectorActor.clearMessages(leaderActor);
1582     }
1583
1584     @Test
1585     public void testHandleAppendEntriesReplySuccess() throws Exception {
1586         logStart("testHandleAppendEntriesReplySuccess");
1587
1588         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1589
1590         leaderActorContext.setReplicatedLog(
1591                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1592
1593         leaderActorContext.setCommitIndex(1);
1594         leaderActorContext.setLastApplied(1);
1595         leaderActorContext.getTermInformation().update(1, "leader");
1596
1597         leader = new Leader(leaderActorContext);
1598
1599         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1600
1601         short payloadVersion = 5;
1602         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1603
1604         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1605
1606         assertEquals(RaftState.Leader, raftActorBehavior.state());
1607
1608         assertEquals(2, leaderActorContext.getCommitIndex());
1609
1610         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1611                 leaderActor, ApplyJournalEntries.class);
1612
1613         assertEquals(2, leaderActorContext.getLastApplied());
1614
1615         assertEquals(2, applyJournalEntries.getToIndex());
1616
1617         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1618                 ApplyState.class);
1619
1620         assertEquals(1,applyStateList.size());
1621
1622         ApplyState applyState = applyStateList.get(0);
1623
1624         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1625
1626         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1627         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1628     }
1629
1630     @Test
1631     public void testHandleAppendEntriesReplyUnknownFollower(){
1632         logStart("testHandleAppendEntriesReplyUnknownFollower");
1633
1634         MockRaftActorContext leaderActorContext = createActorContext();
1635
1636         leader = new Leader(leaderActorContext);
1637
1638         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1639
1640         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1641
1642         assertEquals(RaftState.Leader, raftActorBehavior.state());
1643     }
1644
1645     @Test
1646     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1647         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1648
1649         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1650         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1651                 new FiniteDuration(1000, TimeUnit.SECONDS));
1652         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1653
1654         leaderActorContext.setReplicatedLog(
1655                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1656         long leaderCommitIndex = 3;
1657         leaderActorContext.setCommitIndex(leaderCommitIndex);
1658         leaderActorContext.setLastApplied(leaderCommitIndex);
1659
1660         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1661         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1662         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1663         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1664
1665         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1666
1667         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1668         followerActorContext.setCommitIndex(-1);
1669         followerActorContext.setLastApplied(-1);
1670
1671         Follower follower = new Follower(followerActorContext);
1672         followerActor.underlyingActor().setBehavior(follower);
1673
1674         leader = new Leader(leaderActorContext);
1675
1676         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1677         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1678
1679         MessageCollectorActor.clearMessages(followerActor);
1680         MessageCollectorActor.clearMessages(leaderActor);
1681
1682         // Verify initial AppendEntries sent with the leader's current commit index.
1683         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1684         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1685         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1686
1687         leaderActor.underlyingActor().setBehavior(leader);
1688
1689         leader.handleMessage(followerActor, appendEntriesReply);
1690
1691         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1692         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1693
1694         appendEntries = appendEntriesList.get(0);
1695         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1696         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1697         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1698
1699         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1700         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1701                 appendEntries.getEntries().get(0).getData());
1702         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1703         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1704                 appendEntries.getEntries().get(1).getData());
1705
1706         appendEntries = appendEntriesList.get(1);
1707         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1708         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1709         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1710
1711         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1712         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1713                 appendEntries.getEntries().get(0).getData());
1714         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1715         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1716                 appendEntries.getEntries().get(1).getData());
1717
1718         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1719         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1720
1721         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1722
1723         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1724         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1725     }
1726
1727     @Test
1728     public void testHandleRequestVoteReply(){
1729         logStart("testHandleRequestVoteReply");
1730
1731         MockRaftActorContext leaderActorContext = createActorContext();
1732
1733         leader = new Leader(leaderActorContext);
1734
1735         // Should be a no-op.
1736         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1737                 new RequestVoteReply(1, true));
1738
1739         assertEquals(RaftState.Leader, raftActorBehavior.state());
1740
1741         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1742
1743         assertEquals(RaftState.Leader, raftActorBehavior.state());
1744     }
1745
1746     @Test
1747     public void testIsolatedLeaderCheckNoFollowers() {
1748         logStart("testIsolatedLeaderCheckNoFollowers");
1749
1750         MockRaftActorContext leaderActorContext = createActorContext();
1751
1752         leader = new Leader(leaderActorContext);
1753         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1754         Assert.assertTrue(behavior instanceof Leader);
1755     }
1756
1757     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1758         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1759         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1760
1761         MockRaftActorContext leaderActorContext = createActorContext();
1762
1763         Map<String, String> peerAddresses = new HashMap<>();
1764         peerAddresses.put("follower-1", followerActor1.path().toString());
1765         peerAddresses.put("follower-2", followerActor2.path().toString());
1766
1767         leaderActorContext.setPeerAddresses(peerAddresses);
1768         leaderActorContext.setRaftPolicy(raftPolicy);
1769
1770         leader = new Leader(leaderActorContext);
1771
1772         leader.markFollowerActive("follower-1");
1773         leader.markFollowerActive("follower-2");
1774         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1775         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1776                 behavior instanceof Leader);
1777
1778         // kill 1 follower and verify if that got killed
1779         final JavaTestKit probe = new JavaTestKit(getSystem());
1780         probe.watch(followerActor1);
1781         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1782         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1783         assertEquals(termMsg1.getActor(), followerActor1);
1784
1785         leader.markFollowerInActive("follower-1");
1786         leader.markFollowerActive("follower-2");
1787         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1788         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1789                 behavior instanceof Leader);
1790
1791         // kill 2nd follower and leader should change to Isolated leader
1792         followerActor2.tell(PoisonPill.getInstance(), null);
1793         probe.watch(followerActor2);
1794         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1795         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1796         assertEquals(termMsg2.getActor(), followerActor2);
1797
1798         leader.markFollowerInActive("follower-2");
1799         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1800     }
1801
1802     @Test
1803     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1804         logStart("testIsolatedLeaderCheckTwoFollowers");
1805
1806         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1807
1808         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1809             behavior instanceof IsolatedLeader);
1810     }
1811
1812     @Test
1813     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1814         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1815
1816         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1817
1818         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1819                 behavior instanceof Leader);
1820     }
1821
1822     @Test
1823     public void testLaggingFollowerStarvation() throws Exception {
1824         logStart("testLaggingFollowerStarvation");
1825         new JavaTestKit(getSystem()) {{
1826             String leaderActorId = actorFactory.generateActorId("leader");
1827             String follower1ActorId = actorFactory.generateActorId("follower");
1828             String follower2ActorId = actorFactory.generateActorId("follower");
1829
1830             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1831                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1832             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1833             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1834
1835             MockRaftActorContext leaderActorContext =
1836                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1837
1838             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1839             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1840             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1841
1842             leaderActorContext.setConfigParams(configParams);
1843
1844             leaderActorContext.setReplicatedLog(
1845                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1846
1847             Map<String, String> peerAddresses = new HashMap<>();
1848             peerAddresses.put(follower1ActorId,
1849                     follower1Actor.path().toString());
1850             peerAddresses.put(follower2ActorId,
1851                     follower2Actor.path().toString());
1852
1853             leaderActorContext.setPeerAddresses(peerAddresses);
1854             leaderActorContext.getTermInformation().update(1, leaderActorId);
1855
1856             RaftActorBehavior leader = createBehavior(leaderActorContext);
1857
1858             leaderActor.underlyingActor().setBehavior(leader);
1859
1860             for(int i=1;i<6;i++) {
1861                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1862                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1863                 assertTrue(newBehavior == leader);
1864                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1865             }
1866
1867             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1868             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1869
1870             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1871                     heartbeats.size() > 1);
1872
1873             // Check if follower-2 got AppendEntries during this time and was not starved
1874             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1875
1876             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1877                     appendEntries.size() > 1);
1878
1879         }};
1880     }
1881
1882     @Override
1883     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1884             ActorRef actorRef, RaftRPC rpc) throws Exception {
1885         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1886         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1887     }
1888
1889     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1890
1891         private final long electionTimeOutIntervalMillis;
1892         private final int snapshotChunkSize;
1893
1894         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1895             super();
1896             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1897             this.snapshotChunkSize = snapshotChunkSize;
1898         }
1899
1900         @Override
1901         public FiniteDuration getElectionTimeOutInterval() {
1902             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1903         }
1904
1905         @Override
1906         public int getSnapshotChunkSize() {
1907             return snapshotChunkSize;
1908         }
1909     }
1910 }