Remove unused exceptions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
69 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
70 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
75 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
83
84     static final String FOLLOWER_ID = "follower";
85     public static final String LEADER_ID = "leader";
86
87     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
89
90     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
92
93     private Leader leader;
94     private final short payloadVersion = 5;
95
96     @Override
97     @After
98     public void tearDown() {
99         if (leader != null) {
100             leader.close();
101         }
102
103         super.tearDown();
104     }
105
106     @Test
107     public void testHandleMessageForUnknownMessage() {
108         logStart("testHandleMessageForUnknownMessage");
109
110         leader = new Leader(createActorContext());
111
112         // handle message should null when it receives an unknown message
113         assertNull(leader.handleMessage(followerActor, "foo"));
114     }
115
116     @Test
117     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
119
120         MockRaftActorContext actorContext = createActorContextWithFollower();
121         actorContext.setCommitIndex(-1);
122         actorContext.setPayloadVersion(payloadVersion);
123
124         long term = 1;
125         actorContext.getTermInformation().update(term, "");
126
127         leader = new Leader(actorContext);
128         actorContext.setCurrentBehavior(leader);
129
130         // Leader should send an immediate heartbeat with no entries as follower is inactive.
131         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133         assertEquals("getTerm", term, appendEntries.getTerm());
134         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136         assertEquals("Entries size", 0, appendEntries.getEntries().size());
137         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
138
139         // The follower would normally reply - simulate that explicitly here.
140         leader.handleMessage(followerActor, new AppendEntriesReply(
141                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
143
144         followerActor.underlyingActor().clear();
145
146         // Sleep for the heartbeat interval so AppendEntries is sent.
147         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
149
150         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
151
152         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155         assertEquals("Entries size", 1, appendEntries.getEntries().size());
156         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
159     }
160
161
162     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163         return sendReplicate(actorContext, 1, index);
164     }
165
166     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
167         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
168     }
169
170     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
171         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
172         actorContext.getReplicatedLog().append(newEntry);
173         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
174     }
175
176     @Test
177     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
178         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179
180         MockRaftActorContext actorContext = createActorContextWithFollower();
181
182         long term = 1;
183         actorContext.getTermInformation().update(term, "");
184
185         leader = new Leader(actorContext);
186
187         // Leader will send an immediate heartbeat - ignore it.
188         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189
190         // The follower would normally reply - simulate that explicitly here.
191         long lastIndex = actorContext.getReplicatedLog().lastIndex();
192         leader.handleMessage(followerActor, new AppendEntriesReply(
193                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
194         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
195
196         followerActor.underlyingActor().clear();
197
198         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
199
200         // State should not change
201         assertTrue(raftBehavior instanceof Leader);
202
203         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
204         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
205         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
206         assertEquals("Entries size", 1, appendEntries.getEntries().size());
207         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
208         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
209         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
210         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
211     }
212
213     @Test
214     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
215         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
216
217         MockRaftActorContext actorContext = createActorContextWithFollower();
218         actorContext.setCommitIndex(-1);
219         actorContext.setLastApplied(-1);
220
221         // The raft context is initialized with a couple log entries. However the commitIndex
222         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
223         // committed and applied. Now it regains leadership with a higher term (2).
224         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
225         long newTerm = prevTerm + 1;
226         actorContext.getTermInformation().update(newTerm, "");
227
228         leader = new Leader(actorContext);
229         actorContext.setCurrentBehavior(leader);
230
231         // Leader will send an immediate heartbeat - ignore it.
232         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233
234         // The follower replies with the leader's current last index and term, simulating that it is
235         // up to date with the leader.
236         long lastIndex = actorContext.getReplicatedLog().lastIndex();
237         leader.handleMessage(followerActor, new AppendEntriesReply(
238                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
239
240         // The commit index should not get updated even though consensus was reached. This is b/c the
241         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
242         // from previous terms by counting replicas".
243         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
244
245         followerActor.underlyingActor().clear();
246
247         // Now replicate a new entry with the new term 2.
248         long newIndex = lastIndex + 1;
249         sendReplicate(actorContext, newTerm, newIndex);
250
251         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
252         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
253         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
254         assertEquals("Entries size", 1, appendEntries.getEntries().size());
255         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
256         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
257         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
258
259         // The follower replies with success. The leader should now update the commit index to the new index
260         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
261         // prior entries are committed indirectly".
262         leader.handleMessage(followerActor, new AppendEntriesReply(
263                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
264
265         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
266     }
267
268     @Test
269     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
270         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
271
272         MockRaftActorContext actorContext = createActorContextWithFollower();
273         actorContext.setRaftPolicy(createRaftPolicy(true, true));
274
275         long term = 1;
276         actorContext.getTermInformation().update(term, "");
277
278         leader = new Leader(actorContext);
279
280         // Leader will send an immediate heartbeat - ignore it.
281         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282
283         // The follower would normally reply - simulate that explicitly here.
284         long lastIndex = actorContext.getReplicatedLog().lastIndex();
285         leader.handleMessage(followerActor, new AppendEntriesReply(
286                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
287         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288
289         followerActor.underlyingActor().clear();
290
291         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
292
293         // State should not change
294         assertTrue(raftBehavior instanceof Leader);
295
296         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
297         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
298         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
299         assertEquals("Entries size", 1, appendEntries.getEntries().size());
300         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
301         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
302         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
303         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
304     }
305
306     @Test
307     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
308         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
309
310         MockRaftActorContext actorContext = createActorContextWithFollower();
311         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312             @Override
313             public FiniteDuration getHeartBeatInterval() {
314                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
315             }
316         });
317
318         long term = 1;
319         actorContext.getTermInformation().update(term, "");
320
321         leader = new Leader(actorContext);
322
323         // Leader will send an immediate heartbeat - ignore it.
324         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325
326         // The follower would normally reply - simulate that explicitly here.
327         long lastIndex = actorContext.getReplicatedLog().lastIndex();
328         leader.handleMessage(followerActor, new AppendEntriesReply(
329                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
330         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331
332         followerActor.underlyingActor().clear();
333
334         for (int i = 0; i < 5; i++) {
335             sendReplicate(actorContext, lastIndex + i + 1);
336         }
337
338         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
339         // We expect only 1 message to be sent because of two reasons,
340         // - an append entries reply was not received
341         // - the heartbeat interval has not expired
342         // In this scenario if multiple messages are sent they would likely be duplicates
343         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
344     }
345
346     @Test
347     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
348         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
349
350         MockRaftActorContext actorContext = createActorContextWithFollower();
351         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352             @Override
353             public FiniteDuration getHeartBeatInterval() {
354                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
355             }
356         });
357
358         long term = 1;
359         actorContext.getTermInformation().update(term, "");
360
361         leader = new Leader(actorContext);
362
363         // Leader will send an immediate heartbeat - ignore it.
364         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365
366         // The follower would normally reply - simulate that explicitly here.
367         long lastIndex = actorContext.getReplicatedLog().lastIndex();
368         leader.handleMessage(followerActor, new AppendEntriesReply(
369                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371
372         followerActor.underlyingActor().clear();
373
374         for (int i = 0; i < 3; i++) {
375             sendReplicate(actorContext, lastIndex + i + 1);
376             leader.handleMessage(followerActor, new AppendEntriesReply(
377                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
378
379         }
380
381         for (int i = 3; i < 5; i++) {
382             sendReplicate(actorContext, lastIndex + i + 1);
383         }
384
385         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
386         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
387         // get sent to the follower - but not the 5th
388         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
389
390         for (int i = 0; i < 4; i++) {
391             long expected = allMessages.get(i).getEntries().get(0).getIndex();
392             assertEquals(expected, i + 2);
393         }
394     }
395
396     @Test
397     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
398         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
399
400         MockRaftActorContext actorContext = createActorContextWithFollower();
401         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
402             @Override
403             public FiniteDuration getHeartBeatInterval() {
404                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
405             }
406         });
407
408         long term = 1;
409         actorContext.getTermInformation().update(term, "");
410
411         leader = new Leader(actorContext);
412
413         // Leader will send an immediate heartbeat - ignore it.
414         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
415
416         // The follower would normally reply - simulate that explicitly here.
417         long lastIndex = actorContext.getReplicatedLog().lastIndex();
418         leader.handleMessage(followerActor, new AppendEntriesReply(
419                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
421
422         followerActor.underlyingActor().clear();
423
424         sendReplicate(actorContext, lastIndex + 1);
425
426         // Wait slightly longer than heartbeat duration
427         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
428
429         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
430
431         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
432         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
433
434         assertEquals(1, allMessages.get(0).getEntries().size());
435         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
436         assertEquals(1, allMessages.get(1).getEntries().size());
437         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
438
439     }
440
441     @Test
442     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
443         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
444
445         MockRaftActorContext actorContext = createActorContextWithFollower();
446         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
447             @Override
448             public FiniteDuration getHeartBeatInterval() {
449                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
450             }
451         });
452
453         long term = 1;
454         actorContext.getTermInformation().update(term, "");
455
456         leader = new Leader(actorContext);
457
458         // Leader will send an immediate heartbeat - ignore it.
459         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
460
461         // The follower would normally reply - simulate that explicitly here.
462         long lastIndex = actorContext.getReplicatedLog().lastIndex();
463         leader.handleMessage(followerActor, new AppendEntriesReply(
464                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
465         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
466
467         followerActor.underlyingActor().clear();
468
469         for (int i = 0; i < 3; i++) {
470             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
471             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
472         }
473
474         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
475         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
476     }
477
478     @Test
479     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
480         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
481
482         MockRaftActorContext actorContext = createActorContextWithFollower();
483         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
484             @Override
485             public FiniteDuration getHeartBeatInterval() {
486                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
487             }
488         });
489
490         long term = 1;
491         actorContext.getTermInformation().update(term, "");
492
493         leader = new Leader(actorContext);
494
495         // Leader will send an immediate heartbeat - ignore it.
496         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
497
498         // The follower would normally reply - simulate that explicitly here.
499         long lastIndex = actorContext.getReplicatedLog().lastIndex();
500         leader.handleMessage(followerActor, new AppendEntriesReply(
501                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
502         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
503
504         followerActor.underlyingActor().clear();
505
506         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
507         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
508         sendReplicate(actorContext, lastIndex + 1);
509
510         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
511         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
512
513         assertEquals(0, allMessages.get(0).getEntries().size());
514         assertEquals(1, allMessages.get(1).getEntries().size());
515     }
516
517
518     @Test
519     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
520         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
521
522         MockRaftActorContext actorContext = createActorContext();
523
524         leader = new Leader(actorContext);
525
526         actorContext.setLastApplied(0);
527
528         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
529         long term = actorContext.getTermInformation().getCurrentTerm();
530         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
531                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
532
533         actorContext.getReplicatedLog().append(newEntry);
534
535         final Identifier id = new MockIdentifier("state-id");
536         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
537                 new Replicate(leaderActor, id, newEntry, true));
538
539         // State should not change
540         assertTrue(raftBehavior instanceof Leader);
541
542         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
543
544         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
545         // one since lastApplied state is 0.
546         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
547                 leaderActor, ApplyState.class);
548         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
549
550         for (int i = 0; i <= newLogIndex - 1; i++) {
551             ApplyState applyState = applyStateList.get(i);
552             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
553             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
554         }
555
556         ApplyState last = applyStateList.get((int) newLogIndex - 1);
557         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
558         assertEquals("getIdentifier", id, last.getIdentifier());
559     }
560
561     @Test
562     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
563         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
564
565         final MockRaftActorContext actorContext = createActorContextWithFollower();
566
567         Map<String, String> leadersSnapshot = new HashMap<>();
568         leadersSnapshot.put("1", "A");
569         leadersSnapshot.put("2", "B");
570         leadersSnapshot.put("3", "C");
571
572         //clears leaders log
573         actorContext.getReplicatedLog().removeFrom(0);
574
575         final int commitIndex = 3;
576         final int snapshotIndex = 2;
577         final int snapshotTerm = 1;
578
579         // set the snapshot variables in replicatedlog
580         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
581         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
582         actorContext.setCommitIndex(commitIndex);
583         //set follower timeout to 2 mins, helps during debugging
584         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
585
586         leader = new Leader(actorContext);
587
588         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
589         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
590
591         //update follower timestamp
592         leader.markFollowerActive(FOLLOWER_ID);
593
594         ByteString bs = toByteString(leadersSnapshot);
595         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
596                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
597                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
598         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
599                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
600         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
601         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
602
603         //send first chunk and no InstallSnapshotReply received yet
604         fts.getNextChunk();
605         fts.incrementChunkIndex();
606
607         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
608                 TimeUnit.MILLISECONDS);
609
610         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
611
612         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
613
614         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
615
616         //InstallSnapshotReply received
617         fts.markSendStatus(true);
618
619         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
620
621         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
622
623         assertEquals(commitIndex, is.getLastIncludedIndex());
624     }
625
626     @Test
627     public void testSendAppendEntriesSnapshotScenario() {
628         logStart("testSendAppendEntriesSnapshotScenario");
629
630         final MockRaftActorContext actorContext = createActorContextWithFollower();
631
632         Map<String, String> leadersSnapshot = new HashMap<>();
633         leadersSnapshot.put("1", "A");
634         leadersSnapshot.put("2", "B");
635         leadersSnapshot.put("3", "C");
636
637         //clears leaders log
638         actorContext.getReplicatedLog().removeFrom(0);
639
640         final int followersLastIndex = 2;
641         final int snapshotIndex = 3;
642         final int newEntryIndex = 4;
643         final int snapshotTerm = 1;
644         final int currentTerm = 2;
645
646         // set the snapshot variables in replicatedlog
647         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
648         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
649         actorContext.setCommitIndex(followersLastIndex);
650
651         leader = new Leader(actorContext);
652
653         // Leader will send an immediate heartbeat - ignore it.
654         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
655
656         // new entry
657         SimpleReplicatedLogEntry entry =
658                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
659                         new MockRaftActorContext.MockPayload("D"));
660
661         actorContext.getReplicatedLog().append(entry);
662
663         //update follower timestamp
664         leader.markFollowerActive(FOLLOWER_ID);
665
666         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
667         RaftActorBehavior raftBehavior = leader.handleMessage(
668                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
669
670         assertTrue(raftBehavior instanceof Leader);
671
672         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
673     }
674
675     @Test
676     public void testInitiateInstallSnapshot() {
677         logStart("testInitiateInstallSnapshot");
678
679         MockRaftActorContext actorContext = createActorContextWithFollower();
680
681         //clears leaders log
682         actorContext.getReplicatedLog().removeFrom(0);
683
684         final int followersLastIndex = 2;
685         final int snapshotIndex = 3;
686         final int newEntryIndex = 4;
687         final int snapshotTerm = 1;
688         final int currentTerm = 2;
689
690         // set the snapshot variables in replicatedlog
691         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
692         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
693         actorContext.setLastApplied(3);
694         actorContext.setCommitIndex(followersLastIndex);
695
696         leader = new Leader(actorContext);
697
698         // Leader will send an immediate heartbeat - ignore it.
699         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
700
701         // set the snapshot as absent and check if capture-snapshot is invoked.
702         leader.setSnapshotHolder(null);
703
704         // new entry
705         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
706                 new MockRaftActorContext.MockPayload("D"));
707
708         actorContext.getReplicatedLog().append(entry);
709
710         //update follower timestamp
711         leader.markFollowerActive(FOLLOWER_ID);
712
713         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
714
715         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
716
717         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
718
719         assertEquals(3, cs.getLastAppliedIndex());
720         assertEquals(1, cs.getLastAppliedTerm());
721         assertEquals(4, cs.getLastIndex());
722         assertEquals(2, cs.getLastTerm());
723
724         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
725         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
726
727         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
728     }
729
730     @Test
731     public void testInitiateForceInstallSnapshot() throws Exception {
732         logStart("testInitiateForceInstallSnapshot");
733
734         MockRaftActorContext actorContext = createActorContextWithFollower();
735
736         final int followersLastIndex = 2;
737         final int snapshotIndex = -1;
738         final int newEntryIndex = 4;
739         final int snapshotTerm = -1;
740         final int currentTerm = 2;
741
742         // set the snapshot variables in replicatedlog
743         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
744         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
745         actorContext.setLastApplied(3);
746         actorContext.setCommitIndex(followersLastIndex);
747
748         actorContext.getReplicatedLog().removeFrom(0);
749
750         AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
751         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
752
753         leader = new Leader(actorContext);
754         actorContext.setCurrentBehavior(leader);
755
756         // Leader will send an immediate heartbeat - ignore it.
757         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
758
759         // set the snapshot as absent and check if capture-snapshot is invoked.
760         leader.setSnapshotHolder(null);
761
762         for (int i = 0; i < 4; i++) {
763             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
764                     new MockRaftActorContext.MockPayload("X" + i)));
765         }
766
767         // new entry
768         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
769                 new MockRaftActorContext.MockPayload("D"));
770
771         actorContext.getReplicatedLog().append(entry);
772
773         //update follower timestamp
774         leader.markFollowerActive(FOLLOWER_ID);
775
776         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
777         // installed with a SendInstallSnapshot
778         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
779
780         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
781
782         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
783         assertEquals(3, cs.getLastAppliedIndex());
784         assertEquals(1, cs.getLastAppliedTerm());
785         assertEquals(4, cs.getLastIndex());
786         assertEquals(2, cs.getLastTerm());
787
788         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
789         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
790
791         MessageCollectorActor.clearMessages(followerActor);
792
793         // Sending Replicate message should not initiate another capture since the first is in progress.
794         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
795         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
796
797         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
798         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
799         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
800
801         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
802         final byte[] bytes = new byte[]{1, 2, 3};
803         installSnapshotStream.get().get().write(bytes);
804         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
805                 Runtime.getRuntime().totalMemory());
806         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
807
808         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
809         MessageCollectorActor.clearMessages(followerActor);
810         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
811         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
812     }
813
814
815     @Test
816     public void testInstallSnapshot() {
817         logStart("testInstallSnapshot");
818
819         final MockRaftActorContext actorContext = createActorContextWithFollower();
820
821         Map<String, String> leadersSnapshot = new HashMap<>();
822         leadersSnapshot.put("1", "A");
823         leadersSnapshot.put("2", "B");
824         leadersSnapshot.put("3", "C");
825
826         //clears leaders log
827         actorContext.getReplicatedLog().removeFrom(0);
828
829         final int lastAppliedIndex = 3;
830         final int snapshotIndex = 2;
831         final int snapshotTerm = 1;
832         final int currentTerm = 2;
833
834         // set the snapshot variables in replicatedlog
835         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
836         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
837         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
838         actorContext.setCommitIndex(lastAppliedIndex);
839         actorContext.setLastApplied(lastAppliedIndex);
840
841         leader = new Leader(actorContext);
842
843         // Initial heartbeat.
844         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
845
846         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
847         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
848
849         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
850         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
851                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
852
853         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
854                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
855
856         assertTrue(raftBehavior instanceof Leader);
857
858         // check if installsnapshot gets called with the correct values.
859
860         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
861                 InstallSnapshot.class);
862
863         assertNotNull(installSnapshot.getData());
864         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
865         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
866
867         assertEquals(currentTerm, installSnapshot.getTerm());
868     }
869
870     @Test
871     public void testForceInstallSnapshot() {
872         logStart("testForceInstallSnapshot");
873
874         final MockRaftActorContext actorContext = createActorContextWithFollower();
875
876         Map<String, String> leadersSnapshot = new HashMap<>();
877         leadersSnapshot.put("1", "A");
878         leadersSnapshot.put("2", "B");
879         leadersSnapshot.put("3", "C");
880
881         final int lastAppliedIndex = 3;
882         final int snapshotIndex = -1;
883         final int snapshotTerm = -1;
884         final int currentTerm = 2;
885
886         // set the snapshot variables in replicatedlog
887         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
888         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
889         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
890         actorContext.setCommitIndex(lastAppliedIndex);
891         actorContext.setLastApplied(lastAppliedIndex);
892
893         leader = new Leader(actorContext);
894
895         // Initial heartbeat.
896         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
897
898         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
899         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
900
901         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
902         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
903                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
904
905         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
906                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
907
908         assertTrue(raftBehavior instanceof Leader);
909
910         // check if installsnapshot gets called with the correct values.
911
912         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
913                 InstallSnapshot.class);
914
915         assertNotNull(installSnapshot.getData());
916         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
917         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
918
919         assertEquals(currentTerm, installSnapshot.getTerm());
920     }
921
922     @Test
923     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
924         logStart("testHandleInstallSnapshotReplyLastChunk");
925
926         MockRaftActorContext actorContext = createActorContextWithFollower();
927
928         final int commitIndex = 3;
929         final int snapshotIndex = 2;
930         final int snapshotTerm = 1;
931         final int currentTerm = 2;
932
933         actorContext.setCommitIndex(commitIndex);
934
935         leader = new Leader(actorContext);
936         actorContext.setCurrentBehavior(leader);
937
938         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
939         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
940
941         // Ignore initial heartbeat.
942         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
943
944         Map<String, String> leadersSnapshot = new HashMap<>();
945         leadersSnapshot.put("1", "A");
946         leadersSnapshot.put("2", "B");
947         leadersSnapshot.put("3", "C");
948
949         // set the snapshot variables in replicatedlog
950
951         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
952         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
953         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
954
955         ByteString bs = toByteString(leadersSnapshot);
956         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
957                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
958                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
959         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
960                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
961         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
962         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
963         while (!fts.isLastChunk(fts.getChunkIndex())) {
964             fts.getNextChunk();
965             fts.incrementChunkIndex();
966         }
967
968         //clears leaders log
969         actorContext.getReplicatedLog().removeFrom(0);
970
971         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
972                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
973
974         assertTrue(raftBehavior instanceof Leader);
975
976         assertEquals(1, leader.followerLogSize());
977         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
978         assertNotNull(fli);
979         assertNull(fli.getInstallSnapshotState());
980         assertEquals(commitIndex, fli.getMatchIndex());
981         assertEquals(commitIndex + 1, fli.getNextIndex());
982         assertFalse(leader.hasSnapshot());
983     }
984
985     @Test
986     public void testSendSnapshotfromInstallSnapshotReply() {
987         logStart("testSendSnapshotfromInstallSnapshotReply");
988
989         MockRaftActorContext actorContext = createActorContextWithFollower();
990
991         final int commitIndex = 3;
992         final int snapshotIndex = 2;
993         final int snapshotTerm = 1;
994         final int currentTerm = 2;
995
996         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
997             @Override
998             public int getSnapshotChunkSize() {
999                 return 50;
1000             }
1001         };
1002         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1003         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1004
1005         actorContext.setConfigParams(configParams);
1006         actorContext.setCommitIndex(commitIndex);
1007
1008         leader = new Leader(actorContext);
1009         actorContext.setCurrentBehavior(leader);
1010
1011         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1012         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1013
1014         Map<String, String> leadersSnapshot = new HashMap<>();
1015         leadersSnapshot.put("1", "A");
1016         leadersSnapshot.put("2", "B");
1017         leadersSnapshot.put("3", "C");
1018
1019         // set the snapshot variables in replicatedlog
1020         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1021         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1022         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1023
1024         ByteString bs = toByteString(leadersSnapshot);
1025         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1026                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1027                 -1, null, null);
1028
1029         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1030
1031         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1032                 InstallSnapshot.class);
1033
1034         assertEquals(1, installSnapshot.getChunkIndex());
1035         assertEquals(3, installSnapshot.getTotalChunks());
1036
1037         followerActor.underlyingActor().clear();
1038         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1039                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1040
1041         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1042
1043         assertEquals(2, installSnapshot.getChunkIndex());
1044         assertEquals(3, installSnapshot.getTotalChunks());
1045
1046         followerActor.underlyingActor().clear();
1047         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1048                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1049
1050         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1051
1052         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1053         followerActor.underlyingActor().clear();
1054         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1055                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1056
1057         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1058
1059         assertNull(installSnapshot);
1060     }
1061
1062
1063     @Test
1064     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1065         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1066
1067         MockRaftActorContext actorContext = createActorContextWithFollower();
1068
1069         final int commitIndex = 3;
1070         final int snapshotIndex = 2;
1071         final int snapshotTerm = 1;
1072         final int currentTerm = 2;
1073
1074         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1075             @Override
1076             public int getSnapshotChunkSize() {
1077                 return 50;
1078             }
1079         });
1080
1081         actorContext.setCommitIndex(commitIndex);
1082
1083         leader = new Leader(actorContext);
1084
1085         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1086         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1087
1088         Map<String, String> leadersSnapshot = new HashMap<>();
1089         leadersSnapshot.put("1", "A");
1090         leadersSnapshot.put("2", "B");
1091         leadersSnapshot.put("3", "C");
1092
1093         // set the snapshot variables in replicatedlog
1094         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1095         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1096         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1097
1098         ByteString bs = toByteString(leadersSnapshot);
1099         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1100                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1101                 -1, null, null);
1102
1103         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1104         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1105
1106         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1107                 InstallSnapshot.class);
1108
1109         assertEquals(1, installSnapshot.getChunkIndex());
1110         assertEquals(3, installSnapshot.getTotalChunks());
1111
1112         followerActor.underlyingActor().clear();
1113
1114         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1115                 FOLLOWER_ID, -1, false));
1116
1117         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1118                 TimeUnit.MILLISECONDS);
1119
1120         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1121
1122         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1123
1124         assertEquals(1, installSnapshot.getChunkIndex());
1125         assertEquals(3, installSnapshot.getTotalChunks());
1126     }
1127
1128     @Test
1129     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1130         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1131
1132         MockRaftActorContext actorContext = createActorContextWithFollower();
1133
1134         final int commitIndex = 3;
1135         final int snapshotIndex = 2;
1136         final int snapshotTerm = 1;
1137         final int currentTerm = 2;
1138
1139         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1140             @Override
1141             public int getSnapshotChunkSize() {
1142                 return 50;
1143             }
1144         });
1145
1146         actorContext.setCommitIndex(commitIndex);
1147
1148         leader = new Leader(actorContext);
1149
1150         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1151         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1152
1153         Map<String, String> leadersSnapshot = new HashMap<>();
1154         leadersSnapshot.put("1", "A");
1155         leadersSnapshot.put("2", "B");
1156         leadersSnapshot.put("3", "C");
1157
1158         // set the snapshot variables in replicatedlog
1159         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1160         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1161         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1162
1163         ByteString bs = toByteString(leadersSnapshot);
1164         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1165                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1166                 -1, null, null);
1167
1168         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1169
1170         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1171                 InstallSnapshot.class);
1172
1173         assertEquals(1, installSnapshot.getChunkIndex());
1174         assertEquals(3, installSnapshot.getTotalChunks());
1175         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1176                 installSnapshot.getLastChunkHashCode().get().intValue());
1177
1178         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1179
1180         followerActor.underlyingActor().clear();
1181
1182         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1183                 FOLLOWER_ID, 1, true));
1184
1185         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1186
1187         assertEquals(2, installSnapshot.getChunkIndex());
1188         assertEquals(3, installSnapshot.getTotalChunks());
1189         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1190     }
1191
1192     @Test
1193     public void testLeaderInstallSnapshotState() throws IOException {
1194         logStart("testLeaderInstallSnapshotState");
1195
1196         Map<String, String> leadersSnapshot = new HashMap<>();
1197         leadersSnapshot.put("1", "A");
1198         leadersSnapshot.put("2", "B");
1199         leadersSnapshot.put("3", "C");
1200
1201         ByteString bs = toByteString(leadersSnapshot);
1202         byte[] barray = bs.toByteArray();
1203
1204         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1205         fts.setSnapshotBytes(ByteSource.wrap(barray));
1206
1207         assertEquals(bs.size(), barray.length);
1208
1209         int chunkIndex = 0;
1210         for (int i = 0; i < barray.length; i = i + 50) {
1211             int length = i + 50;
1212             chunkIndex++;
1213
1214             if (i + 50 > barray.length) {
1215                 length = barray.length;
1216             }
1217
1218             byte[] chunk = fts.getNextChunk();
1219             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1220             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1221
1222             fts.markSendStatus(true);
1223             if (!fts.isLastChunk(chunkIndex)) {
1224                 fts.incrementChunkIndex();
1225             }
1226         }
1227
1228         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1229         fts.close();
1230     }
1231
1232     @Override
1233     protected Leader createBehavior(final RaftActorContext actorContext) {
1234         return new Leader(actorContext);
1235     }
1236
1237     @Override
1238     protected MockRaftActorContext createActorContext() {
1239         return createActorContext(leaderActor);
1240     }
1241
1242     @Override
1243     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1244         return createActorContext(LEADER_ID, actorRef);
1245     }
1246
1247     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1248         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1249         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1250         configParams.setElectionTimeoutFactor(100000);
1251         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1252         context.setConfigParams(configParams);
1253         context.setPayloadVersion(payloadVersion);
1254         return context;
1255     }
1256
1257     private MockRaftActorContext createActorContextWithFollower() {
1258         MockRaftActorContext actorContext = createActorContext();
1259         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1260                 followerActor.path().toString()).build());
1261         return actorContext;
1262     }
1263
1264     private MockRaftActorContext createFollowerActorContextWithLeader() {
1265         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1266         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1267         followerConfig.setElectionTimeoutFactor(10000);
1268         followerActorContext.setConfigParams(followerConfig);
1269         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1270         return followerActorContext;
1271     }
1272
1273     @Test
1274     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1275         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1276
1277         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1278
1279         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1280
1281         Follower follower = new Follower(followerActorContext);
1282         followerActor.underlyingActor().setBehavior(follower);
1283         followerActorContext.setCurrentBehavior(follower);
1284
1285         Map<String, String> peerAddresses = new HashMap<>();
1286         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1287
1288         leaderActorContext.setPeerAddresses(peerAddresses);
1289
1290         leaderActorContext.getReplicatedLog().removeFrom(0);
1291
1292         //create 3 entries
1293         leaderActorContext.setReplicatedLog(
1294                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1295
1296         leaderActorContext.setCommitIndex(1);
1297
1298         followerActorContext.getReplicatedLog().removeFrom(0);
1299
1300         // follower too has the exact same log entries and has the same commit index
1301         followerActorContext.setReplicatedLog(
1302                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1303
1304         followerActorContext.setCommitIndex(1);
1305
1306         leader = new Leader(leaderActorContext);
1307         leaderActorContext.setCurrentBehavior(leader);
1308
1309         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1310
1311         assertEquals(-1, appendEntries.getLeaderCommit());
1312         assertEquals(0, appendEntries.getEntries().size());
1313         assertEquals(0, appendEntries.getPrevLogIndex());
1314
1315         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1316                 leaderActor, AppendEntriesReply.class);
1317
1318         assertEquals(2, appendEntriesReply.getLogLastIndex());
1319         assertEquals(1, appendEntriesReply.getLogLastTerm());
1320
1321         // follower returns its next index
1322         assertEquals(2, appendEntriesReply.getLogLastIndex());
1323         assertEquals(1, appendEntriesReply.getLogLastTerm());
1324
1325         follower.close();
1326     }
1327
1328     @Test
1329     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1330         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1331
1332         final MockRaftActorContext leaderActorContext = createActorContext();
1333
1334         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1335         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1336
1337         Follower follower = new Follower(followerActorContext);
1338         followerActor.underlyingActor().setBehavior(follower);
1339         followerActorContext.setCurrentBehavior(follower);
1340
1341         Map<String, String> leaderPeerAddresses = new HashMap<>();
1342         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1343
1344         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1345
1346         leaderActorContext.getReplicatedLog().removeFrom(0);
1347
1348         leaderActorContext.setReplicatedLog(
1349                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1350
1351         leaderActorContext.setCommitIndex(1);
1352
1353         followerActorContext.getReplicatedLog().removeFrom(0);
1354
1355         followerActorContext.setReplicatedLog(
1356                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1357
1358         // follower has the same log entries but its commit index > leaders commit index
1359         followerActorContext.setCommitIndex(2);
1360
1361         leader = new Leader(leaderActorContext);
1362
1363         // Initial heartbeat
1364         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1365
1366         assertEquals(-1, appendEntries.getLeaderCommit());
1367         assertEquals(0, appendEntries.getEntries().size());
1368         assertEquals(0, appendEntries.getPrevLogIndex());
1369
1370         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1371                 leaderActor, AppendEntriesReply.class);
1372
1373         assertEquals(2, appendEntriesReply.getLogLastIndex());
1374         assertEquals(1, appendEntriesReply.getLogLastTerm());
1375
1376         leaderActor.underlyingActor().setBehavior(follower);
1377         leader.handleMessage(followerActor, appendEntriesReply);
1378
1379         leaderActor.underlyingActor().clear();
1380         followerActor.underlyingActor().clear();
1381
1382         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1383                 TimeUnit.MILLISECONDS);
1384
1385         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1386
1387         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1388
1389         assertEquals(2, appendEntries.getLeaderCommit());
1390         assertEquals(0, appendEntries.getEntries().size());
1391         assertEquals(2, appendEntries.getPrevLogIndex());
1392
1393         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1394
1395         assertEquals(2, appendEntriesReply.getLogLastIndex());
1396         assertEquals(1, appendEntriesReply.getLogLastTerm());
1397
1398         assertEquals(2, followerActorContext.getCommitIndex());
1399
1400         follower.close();
1401     }
1402
1403     @Test
1404     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1405         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1406
1407         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1408         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1409                 new FiniteDuration(1000, TimeUnit.SECONDS));
1410
1411         leaderActorContext.setReplicatedLog(
1412                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1413         long leaderCommitIndex = 2;
1414         leaderActorContext.setCommitIndex(leaderCommitIndex);
1415         leaderActorContext.setLastApplied(leaderCommitIndex);
1416
1417         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1418         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1419
1420         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1421
1422         followerActorContext.setReplicatedLog(
1423                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1424         followerActorContext.setCommitIndex(0);
1425         followerActorContext.setLastApplied(0);
1426
1427         Follower follower = new Follower(followerActorContext);
1428         followerActor.underlyingActor().setBehavior(follower);
1429
1430         leader = new Leader(leaderActorContext);
1431
1432         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1433         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1434                 AppendEntriesReply.class);
1435
1436         MessageCollectorActor.clearMessages(followerActor);
1437         MessageCollectorActor.clearMessages(leaderActor);
1438
1439         // Verify initial AppendEntries sent.
1440         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1441         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1442         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1443
1444         leaderActor.underlyingActor().setBehavior(leader);
1445
1446         leader.handleMessage(followerActor, appendEntriesReply);
1447
1448         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1449         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1450
1451         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1452         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1453         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1454
1455         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1456         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1457                 appendEntries.getEntries().get(0).getData());
1458         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1459         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1460                 appendEntries.getEntries().get(1).getData());
1461
1462         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1463         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1464
1465         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1466
1467         ApplyState applyState = applyStateList.get(0);
1468         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1469         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1470         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1471                 applyState.getReplicatedLogEntry().getData());
1472
1473         applyState = applyStateList.get(1);
1474         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1475         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1476         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1477                 applyState.getReplicatedLogEntry().getData());
1478
1479         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1480         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1481     }
1482
1483     @Test
1484     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1485         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1486
1487         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1488         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1489                 new FiniteDuration(1000, TimeUnit.SECONDS));
1490
1491         leaderActorContext.setReplicatedLog(
1492                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1493         long leaderCommitIndex = 1;
1494         leaderActorContext.setCommitIndex(leaderCommitIndex);
1495         leaderActorContext.setLastApplied(leaderCommitIndex);
1496
1497         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1498         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1499
1500         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1501
1502         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1503         followerActorContext.setCommitIndex(-1);
1504         followerActorContext.setLastApplied(-1);
1505
1506         Follower follower = new Follower(followerActorContext);
1507         followerActor.underlyingActor().setBehavior(follower);
1508         followerActorContext.setCurrentBehavior(follower);
1509
1510         leader = new Leader(leaderActorContext);
1511
1512         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1513         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1514                 AppendEntriesReply.class);
1515
1516         MessageCollectorActor.clearMessages(followerActor);
1517         MessageCollectorActor.clearMessages(leaderActor);
1518
1519         // Verify initial AppendEntries sent with the leader's current commit index.
1520         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1521         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1522         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1523
1524         leaderActor.underlyingActor().setBehavior(leader);
1525         leaderActorContext.setCurrentBehavior(leader);
1526
1527         leader.handleMessage(followerActor, appendEntriesReply);
1528
1529         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1530         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1531
1532         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1533         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1534         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1535
1536         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1537         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1538                 appendEntries.getEntries().get(0).getData());
1539         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1540         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1541                 appendEntries.getEntries().get(1).getData());
1542
1543         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1544         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1545
1546         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1547
1548         ApplyState applyState = applyStateList.get(0);
1549         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1550         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1551         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1552                 applyState.getReplicatedLogEntry().getData());
1553
1554         applyState = applyStateList.get(1);
1555         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1556         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1557         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1558                 applyState.getReplicatedLogEntry().getData());
1559
1560         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1561         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1562     }
1563
1564     @Test
1565     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1566         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1567
1568         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1569         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1570                 new FiniteDuration(1000, TimeUnit.SECONDS));
1571
1572         leaderActorContext.setReplicatedLog(
1573                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1574         long leaderCommitIndex = 1;
1575         leaderActorContext.setCommitIndex(leaderCommitIndex);
1576         leaderActorContext.setLastApplied(leaderCommitIndex);
1577
1578         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1579         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1580
1581         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1582
1583         followerActorContext.setReplicatedLog(
1584                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1585         followerActorContext.setCommitIndex(-1);
1586         followerActorContext.setLastApplied(-1);
1587
1588         Follower follower = new Follower(followerActorContext);
1589         followerActor.underlyingActor().setBehavior(follower);
1590         followerActorContext.setCurrentBehavior(follower);
1591
1592         leader = new Leader(leaderActorContext);
1593
1594         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1595         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1596                 AppendEntriesReply.class);
1597
1598         MessageCollectorActor.clearMessages(followerActor);
1599         MessageCollectorActor.clearMessages(leaderActor);
1600
1601         // Verify initial AppendEntries sent with the leader's current commit index.
1602         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1603         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1604         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1605
1606         leaderActor.underlyingActor().setBehavior(leader);
1607         leaderActorContext.setCurrentBehavior(leader);
1608
1609         leader.handleMessage(followerActor, appendEntriesReply);
1610
1611         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1612         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1613
1614         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1615         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1616         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1617
1618         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1619         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1620         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1621                 appendEntries.getEntries().get(0).getData());
1622         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1623         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1624         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1625                 appendEntries.getEntries().get(1).getData());
1626
1627         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1628         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1629
1630         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1631
1632         ApplyState applyState = applyStateList.get(0);
1633         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1634         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1635         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1636                 applyState.getReplicatedLogEntry().getData());
1637
1638         applyState = applyStateList.get(1);
1639         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1640         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1641         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1642                 applyState.getReplicatedLogEntry().getData());
1643
1644         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1645         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1646         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1647     }
1648
1649     @Test
1650     public void testHandleAppendEntriesReplyWithNewerTerm() {
1651         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1652
1653         MockRaftActorContext leaderActorContext = createActorContext();
1654         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1655                 new FiniteDuration(10000, TimeUnit.SECONDS));
1656
1657         leaderActorContext.setReplicatedLog(
1658                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1659
1660         leader = new Leader(leaderActorContext);
1661         leaderActor.underlyingActor().setBehavior(leader);
1662         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1663
1664         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1665                 AppendEntriesReply.class);
1666
1667         assertEquals(false, appendEntriesReply.isSuccess());
1668         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1669
1670         MessageCollectorActor.clearMessages(leaderActor);
1671     }
1672
1673     @Test
1674     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1675         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1676
1677         MockRaftActorContext leaderActorContext = createActorContext();
1678         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1679                 new FiniteDuration(10000, TimeUnit.SECONDS));
1680
1681         leaderActorContext.setReplicatedLog(
1682                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1683         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1684
1685         leader = new Leader(leaderActorContext);
1686         leaderActor.underlyingActor().setBehavior(leader);
1687         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1688
1689         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1690                 AppendEntriesReply.class);
1691
1692         assertEquals(false, appendEntriesReply.isSuccess());
1693         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1694
1695         MessageCollectorActor.clearMessages(leaderActor);
1696     }
1697
1698     @Test
1699     public void testHandleAppendEntriesReplySuccess() {
1700         logStart("testHandleAppendEntriesReplySuccess");
1701
1702         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1703
1704         leaderActorContext.setReplicatedLog(
1705                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1706
1707         leaderActorContext.setCommitIndex(1);
1708         leaderActorContext.setLastApplied(1);
1709         leaderActorContext.getTermInformation().update(1, "leader");
1710
1711         leader = new Leader(leaderActorContext);
1712
1713         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1714
1715         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1716         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1717
1718         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1719
1720         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1721
1722         assertEquals(RaftState.Leader, raftActorBehavior.state());
1723
1724         assertEquals(2, leaderActorContext.getCommitIndex());
1725
1726         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1727                 leaderActor, ApplyJournalEntries.class);
1728
1729         assertEquals(2, leaderActorContext.getLastApplied());
1730
1731         assertEquals(2, applyJournalEntries.getToIndex());
1732
1733         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1734                 ApplyState.class);
1735
1736         assertEquals(1,applyStateList.size());
1737
1738         ApplyState applyState = applyStateList.get(0);
1739
1740         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1741
1742         assertEquals(2, followerInfo.getMatchIndex());
1743         assertEquals(3, followerInfo.getNextIndex());
1744         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1745         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1746     }
1747
1748     @Test
1749     public void testHandleAppendEntriesReplyUnknownFollower() {
1750         logStart("testHandleAppendEntriesReplyUnknownFollower");
1751
1752         MockRaftActorContext leaderActorContext = createActorContext();
1753
1754         leader = new Leader(leaderActorContext);
1755
1756         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1757
1758         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1759
1760         assertEquals(RaftState.Leader, raftActorBehavior.state());
1761     }
1762
1763     @Test
1764     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1765         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1766
1767         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1768         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1769                 new FiniteDuration(1000, TimeUnit.SECONDS));
1770         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1771
1772         leaderActorContext.setReplicatedLog(
1773                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1774         long leaderCommitIndex = 3;
1775         leaderActorContext.setCommitIndex(leaderCommitIndex);
1776         leaderActorContext.setLastApplied(leaderCommitIndex);
1777
1778         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1779         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1780         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1781         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1782
1783         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1784
1785         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1786         followerActorContext.setCommitIndex(-1);
1787         followerActorContext.setLastApplied(-1);
1788
1789         Follower follower = new Follower(followerActorContext);
1790         followerActor.underlyingActor().setBehavior(follower);
1791         followerActorContext.setCurrentBehavior(follower);
1792
1793         leader = new Leader(leaderActorContext);
1794
1795         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1796         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1797                 AppendEntriesReply.class);
1798
1799         MessageCollectorActor.clearMessages(followerActor);
1800         MessageCollectorActor.clearMessages(leaderActor);
1801
1802         // Verify initial AppendEntries sent with the leader's current commit index.
1803         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1804         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1805         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1806
1807         leaderActor.underlyingActor().setBehavior(leader);
1808         leaderActorContext.setCurrentBehavior(leader);
1809
1810         leader.handleMessage(followerActor, appendEntriesReply);
1811
1812         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1813                 AppendEntries.class, 2);
1814         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1815
1816         appendEntries = appendEntriesList.get(0);
1817         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1818         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1819         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1820
1821         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1822         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1823                 appendEntries.getEntries().get(0).getData());
1824         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1825         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1826                 appendEntries.getEntries().get(1).getData());
1827
1828         appendEntries = appendEntriesList.get(1);
1829         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1830         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1831         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1832
1833         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1834         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1835                 appendEntries.getEntries().get(0).getData());
1836         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1837         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1838                 appendEntries.getEntries().get(1).getData());
1839
1840         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1841         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1842
1843         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1844
1845         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1846         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1847     }
1848
1849     @Test
1850     public void testHandleRequestVoteReply() {
1851         logStart("testHandleRequestVoteReply");
1852
1853         MockRaftActorContext leaderActorContext = createActorContext();
1854
1855         leader = new Leader(leaderActorContext);
1856
1857         // Should be a no-op.
1858         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1859                 new RequestVoteReply(1, true));
1860
1861         assertEquals(RaftState.Leader, raftActorBehavior.state());
1862
1863         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1864
1865         assertEquals(RaftState.Leader, raftActorBehavior.state());
1866     }
1867
1868     @Test
1869     public void testIsolatedLeaderCheckNoFollowers() {
1870         logStart("testIsolatedLeaderCheckNoFollowers");
1871
1872         MockRaftActorContext leaderActorContext = createActorContext();
1873
1874         leader = new Leader(leaderActorContext);
1875         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1876         assertTrue(newBehavior instanceof Leader);
1877     }
1878
1879     @Test
1880     public void testIsolatedLeaderCheckNoVotingFollowers() {
1881         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1882
1883         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1884         Follower follower = new Follower(followerActorContext);
1885         followerActor.underlyingActor().setBehavior(follower);
1886
1887         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1888         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1889                 new FiniteDuration(1000, TimeUnit.SECONDS));
1890         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1891
1892         leader = new Leader(leaderActorContext);
1893         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1894         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1895         assertTrue("Expected Leader", newBehavior instanceof Leader);
1896     }
1897
1898     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1899         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1900         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1901
1902         MockRaftActorContext leaderActorContext = createActorContext();
1903
1904         Map<String, String> peerAddresses = new HashMap<>();
1905         peerAddresses.put("follower-1", followerActor1.path().toString());
1906         peerAddresses.put("follower-2", followerActor2.path().toString());
1907
1908         leaderActorContext.setPeerAddresses(peerAddresses);
1909         leaderActorContext.setRaftPolicy(raftPolicy);
1910
1911         leader = new Leader(leaderActorContext);
1912
1913         leader.markFollowerActive("follower-1");
1914         leader.markFollowerActive("follower-2");
1915         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1916         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1917
1918         // kill 1 follower and verify if that got killed
1919         final TestKit probe = new TestKit(getSystem());
1920         probe.watch(followerActor1);
1921         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1922         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1923         assertEquals(termMsg1.getActor(), followerActor1);
1924
1925         leader.markFollowerInActive("follower-1");
1926         leader.markFollowerActive("follower-2");
1927         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1928         assertTrue("Behavior not instance of Leader when majority of followers are active",
1929                 newBehavior instanceof Leader);
1930
1931         // kill 2nd follower and leader should change to Isolated leader
1932         followerActor2.tell(PoisonPill.getInstance(), null);
1933         probe.watch(followerActor2);
1934         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1935         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1936         assertEquals(termMsg2.getActor(), followerActor2);
1937
1938         leader.markFollowerInActive("follower-2");
1939         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1940     }
1941
1942     @Test
1943     public void testIsolatedLeaderCheckTwoFollowers() {
1944         logStart("testIsolatedLeaderCheckTwoFollowers");
1945
1946         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1947
1948         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1949             newBehavior instanceof IsolatedLeader);
1950     }
1951
1952     @Test
1953     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1954         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1955
1956         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1957
1958         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1959                 newBehavior instanceof Leader);
1960     }
1961
1962     @Test
1963     public void testLaggingFollowerStarvation() {
1964         logStart("testLaggingFollowerStarvation");
1965
1966         String leaderActorId = actorFactory.generateActorId("leader");
1967         String follower1ActorId = actorFactory.generateActorId("follower");
1968         String follower2ActorId = actorFactory.generateActorId("follower");
1969
1970         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1971         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1972
1973         MockRaftActorContext leaderActorContext =
1974                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1975
1976         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1977         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1978         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1979
1980         leaderActorContext.setConfigParams(configParams);
1981
1982         leaderActorContext.setReplicatedLog(
1983                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1984
1985         Map<String, String> peerAddresses = new HashMap<>();
1986         peerAddresses.put(follower1ActorId,
1987                 follower1Actor.path().toString());
1988         peerAddresses.put(follower2ActorId,
1989                 follower2Actor.path().toString());
1990
1991         leaderActorContext.setPeerAddresses(peerAddresses);
1992         leaderActorContext.getTermInformation().update(1, leaderActorId);
1993
1994         leader = createBehavior(leaderActorContext);
1995
1996         leaderActor.underlyingActor().setBehavior(leader);
1997
1998         for (int i = 1; i < 6; i++) {
1999             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2000             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2001                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2002             assertTrue(newBehavior == leader);
2003             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2004         }
2005
2006         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2007         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2008
2009         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2010                 heartbeats.size() > 1);
2011
2012         // Check if follower-2 got AppendEntries during this time and was not starved
2013         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2014
2015         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2016                 appendEntries.size() > 1);
2017     }
2018
2019     @Test
2020     public void testReplicationConsensusWithNonVotingFollower() {
2021         logStart("testReplicationConsensusWithNonVotingFollower");
2022
2023         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2024         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2025                 new FiniteDuration(1000, TimeUnit.SECONDS));
2026
2027         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2028         leaderActorContext.setCommitIndex(-1);
2029         leaderActorContext.setLastApplied(-1);
2030
2031         String nonVotingFollowerId = "nonvoting-follower";
2032         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2033                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2034
2035         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2036                 VotingState.NON_VOTING);
2037
2038         leader = new Leader(leaderActorContext);
2039         leaderActorContext.setCurrentBehavior(leader);
2040
2041         // Ignore initial heartbeats
2042         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2043         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2044
2045         MessageCollectorActor.clearMessages(followerActor);
2046         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2047         MessageCollectorActor.clearMessages(leaderActor);
2048
2049         // Send a Replicate message and wait for AppendEntries.
2050         sendReplicate(leaderActorContext, 0);
2051
2052         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2053         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2054
2055         // Send reply only from the voting follower and verify consensus via ApplyState.
2056         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2057
2058         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2059
2060         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2061
2062         MessageCollectorActor.clearMessages(followerActor);
2063         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2064         MessageCollectorActor.clearMessages(leaderActor);
2065
2066         // Send another Replicate message
2067         sendReplicate(leaderActorContext, 1);
2068
2069         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2071                 AppendEntries.class);
2072         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2073         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2074
2075         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2076         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2077
2078         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2079
2080         // Send reply from the voting follower and verify consensus.
2081         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2082
2083         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2084     }
2085
2086     @Test
2087     public void testTransferLeadershipWithFollowerInSync() {
2088         logStart("testTransferLeadershipWithFollowerInSync");
2089
2090         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2091         leaderActorContext.setLastApplied(-1);
2092         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2093                 new FiniteDuration(1000, TimeUnit.SECONDS));
2094         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2095
2096         leader = new Leader(leaderActorContext);
2097         leaderActorContext.setCurrentBehavior(leader);
2098
2099         // Initial heartbeat
2100         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2101         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2102         MessageCollectorActor.clearMessages(followerActor);
2103
2104         sendReplicate(leaderActorContext, 0);
2105         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106
2107         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2108         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2109         MessageCollectorActor.clearMessages(followerActor);
2110
2111         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2112         leader.transferLeadership(mockTransferCohort);
2113
2114         verify(mockTransferCohort, never()).transferComplete();
2115         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2116         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2117         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2118
2119         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2120         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2121
2122         // Leader should force an election timeout
2123         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2124
2125         verify(mockTransferCohort).transferComplete();
2126     }
2127
2128     @Test
2129     public void testTransferLeadershipWithEmptyLog() {
2130         logStart("testTransferLeadershipWithEmptyLog");
2131
2132         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2133         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2134                 new FiniteDuration(1000, TimeUnit.SECONDS));
2135         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2136
2137         leader = new Leader(leaderActorContext);
2138         leaderActorContext.setCurrentBehavior(leader);
2139
2140         // Initial heartbeat
2141         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2142         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2143         MessageCollectorActor.clearMessages(followerActor);
2144
2145         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2146         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2147         leader.transferLeadership(mockTransferCohort);
2148
2149         verify(mockTransferCohort, never()).transferComplete();
2150         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2151         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2152
2153         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2154         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2155
2156         // Leader should force an election timeout
2157         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2158
2159         verify(mockTransferCohort).transferComplete();
2160     }
2161
2162     @Test
2163     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2164         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2165
2166         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2167         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2168                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2169
2170         leader = new Leader(leaderActorContext);
2171         leaderActorContext.setCurrentBehavior(leader);
2172
2173         // Initial heartbeat
2174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2175         MessageCollectorActor.clearMessages(followerActor);
2176
2177         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2178         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2179         leader.transferLeadership(mockTransferCohort);
2180
2181         verify(mockTransferCohort, never()).transferComplete();
2182
2183         // Sync up the follower.
2184         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2185         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2186         MessageCollectorActor.clearMessages(followerActor);
2187
2188         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2189                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2190         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2191         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2192         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2193
2194         // Leader should force an election timeout
2195         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2196
2197         verify(mockTransferCohort).transferComplete();
2198     }
2199
2200     @Test
2201     public void testTransferLeadershipWithFollowerSyncTimeout() {
2202         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2203
2204         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2205         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2206                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2207         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2208         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2209
2210         leader = new Leader(leaderActorContext);
2211         leaderActorContext.setCurrentBehavior(leader);
2212
2213         // Initial heartbeat
2214         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2215         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2216         MessageCollectorActor.clearMessages(followerActor);
2217
2218         sendReplicate(leaderActorContext, 0);
2219         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2220
2221         MessageCollectorActor.clearMessages(followerActor);
2222
2223         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2224         leader.transferLeadership(mockTransferCohort);
2225
2226         verify(mockTransferCohort, never()).transferComplete();
2227
2228         // Send heartbeats to time out the transfer.
2229         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2230             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2231                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2232             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2233         }
2234
2235         verify(mockTransferCohort).abortTransfer();
2236         verify(mockTransferCohort, never()).transferComplete();
2237         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2238     }
2239
2240     @Test
2241     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2242         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2243
2244         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2245                 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2246                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2247         final MockRaftActorContext.MockPayload largePayload =
2248                 new MockRaftActorContext.MockPayload("large", serializedSize);
2249
2250         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2251         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2252                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2253         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2254         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2255         leaderActorContext.setCommitIndex(-1);
2256         leaderActorContext.setLastApplied(-1);
2257
2258         leader = new Leader(leaderActorContext);
2259         leaderActorContext.setCurrentBehavior(leader);
2260
2261         // Send initial heartbeat reply so follower is marked active
2262         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2263         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2264         MessageCollectorActor.clearMessages(followerActor);
2265
2266         // Send normal payload first to prime commit index.
2267         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2268         sendReplicate(leaderActorContext, term, 0);
2269
2270         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2271         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2272         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2273
2274         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2275         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2276         MessageCollectorActor.clearMessages(followerActor);
2277
2278         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2279         sendReplicate(leaderActorContext, term, 1, largePayload);
2280
2281         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2282         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2283         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2284
2285         final Identifier slicingId = messageSlice.getIdentifier();
2286
2287         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2288         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2289         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2290         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2291         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2292         MessageCollectorActor.clearMessages(followerActor);
2293
2294         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2295
2296         // Sleep for the heartbeat interval so AppendEntries is sent.
2297         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2298                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2299
2300         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2301
2302         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2303         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2304         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2305         MessageCollectorActor.clearMessages(followerActor);
2306
2307         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2308
2309         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2310         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2311         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2312
2313         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2314
2315         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2316
2317         MessageCollectorActor.clearMessages(followerActor);
2318
2319         // Send another normal payload.
2320
2321         sendReplicate(leaderActorContext, term, 2);
2322
2323         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2324         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2325         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2326         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2327     }
2328
2329     @Test
2330     public void testLargePayloadSlicingExpiration() {
2331         logStart("testLargePayloadSlicingExpiration");
2332
2333         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2334         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2335                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2336         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2337         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2338         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2339         leaderActorContext.setCommitIndex(-1);
2340         leaderActorContext.setLastApplied(-1);
2341
2342         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2343         leader = new Leader(leaderActorContext);
2344         leaderActorContext.setCurrentBehavior(leader);
2345
2346         // Send initial heartbeat reply so follower is marked active
2347         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2348         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2349         MessageCollectorActor.clearMessages(followerActor);
2350
2351         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2352                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2353         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2354
2355         // Sleep for at least 3 * election timeout so the slicing state expires.
2356         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2357                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2358         MessageCollectorActor.clearMessages(followerActor);
2359
2360         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2361
2362         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2363         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2364         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2365
2366         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2367         MessageCollectorActor.clearMessages(followerActor);
2368
2369         // Send an AppendEntriesReply - this should restart the slicing.
2370
2371         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2372                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2373
2374         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2375
2376         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2377     }
2378
2379     @Override
2380     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2381             final ActorRef actorRef, final RaftRPC rpc) {
2382         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2383         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2384     }
2385
2386     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2387
2388         private final long electionTimeOutIntervalMillis;
2389         private final int snapshotChunkSize;
2390
2391         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2392             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2393             this.snapshotChunkSize = snapshotChunkSize;
2394         }
2395
2396         @Override
2397         public FiniteDuration getElectionTimeOutInterval() {
2398             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2399         }
2400
2401         @Override
2402         public int getSnapshotChunkSize() {
2403             return snapshotChunkSize;
2404         }
2405     }
2406 }