atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.OptionalInt;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.messaging.MessageSlice;
44 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
47 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
50 import org.opendaylight.controller.cluster.raft.RaftState;
51 import org.opendaylight.controller.cluster.raft.RaftVersions;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.VotingState;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
56 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
57 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
59 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
61 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
65 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.messages.Payload;
67 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
71 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
72 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
73 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
74 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
75 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import org.opendaylight.yangtools.concepts.Identifier;
78 import scala.concurrent.duration.FiniteDuration;
79
80 public class LeaderTest extends AbstractLeaderTest<Leader> {
81
82     static final String FOLLOWER_ID = "follower";
83     public static final String LEADER_ID = "leader";
84
85     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
86             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
87
88     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
90
91     private Leader leader;
92     private final short payloadVersion = 5;
93
94     @Override
95     @After
96     public void tearDown() {
97         if (leader != null) {
98             leader.close();
99         }
100
101         super.tearDown();
102     }
103
104     @Test
105     public void testHandleMessageForUnknownMessage() {
106         logStart("testHandleMessageForUnknownMessage");
107
108         leader = new Leader(createActorContext());
109
110         // handle message should null when it receives an unknown message
111         assertNull(leader.handleMessage(followerActor, "foo"));
112     }
113
114     @Test
115     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
116         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
117
118         MockRaftActorContext actorContext = createActorContextWithFollower();
119         actorContext.setCommitIndex(-1);
120         actorContext.setPayloadVersion(payloadVersion);
121
122         long term = 1;
123         actorContext.getTermInformation().update(term, "");
124
125         leader = new Leader(actorContext);
126         actorContext.setCurrentBehavior(leader);
127
128         // Leader should send an immediate heartbeat with no entries as follower is inactive.
129         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
130         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
131         assertEquals("getTerm", term, appendEntries.getTerm());
132         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
133         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
134         assertEquals("Entries size", 0, appendEntries.getEntries().size());
135         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
136
137         // The follower would normally reply - simulate that explicitly here.
138         leader.handleMessage(followerActor, new AppendEntriesReply(
139                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
140         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
141
142         followerActor.underlyingActor().clear();
143
144         // Sleep for the heartbeat interval so AppendEntries is sent.
145         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
146                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
147
148         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
149
150         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
152         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
153         assertEquals("Entries size", 1, appendEntries.getEntries().size());
154         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
155         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
156         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
157     }
158
159
160     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
161         return sendReplicate(actorContext, 1, index);
162     }
163
164     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
165             final long index) {
166         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
167     }
168
169     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
170             final Payload payload) {
171         actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload));
172         return leader.handleMessage(leaderActor, new Replicate(index, true, null, null));
173     }
174
175     @Test
176     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
177         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
178
179         MockRaftActorContext actorContext = createActorContextWithFollower();
180
181         long term = 1;
182         actorContext.getTermInformation().update(term, "");
183
184         leader = new Leader(actorContext);
185
186         // Leader will send an immediate heartbeat - ignore it.
187         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
188
189         // The follower would normally reply - simulate that explicitly here.
190         long lastIndex = actorContext.getReplicatedLog().lastIndex();
191         leader.handleMessage(followerActor, new AppendEntriesReply(
192                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
193         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
194
195         followerActor.underlyingActor().clear();
196
197         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
198
199         // State should not change
200         assertTrue(raftBehavior instanceof Leader);
201
202         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
203         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
204         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
205         assertEquals("Entries size", 1, appendEntries.getEntries().size());
206         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
207         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
208         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
209         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
210     }
211
212     @Test
213     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
214         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
215
216         MockRaftActorContext actorContext = createActorContextWithFollower();
217         actorContext.setCommitIndex(-1);
218         actorContext.setLastApplied(-1);
219
220         // The raft context is initialized with a couple log entries. However the commitIndex
221         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
222         // committed and applied. Now it regains leadership with a higher term (2).
223         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
224         long newTerm = prevTerm + 1;
225         actorContext.getTermInformation().update(newTerm, "");
226
227         leader = new Leader(actorContext);
228         actorContext.setCurrentBehavior(leader);
229
230         // Leader will send an immediate heartbeat - ignore it.
231         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
232
233         // The follower replies with the leader's current last index and term, simulating that it is
234         // up to date with the leader.
235         long lastIndex = actorContext.getReplicatedLog().lastIndex();
236         leader.handleMessage(followerActor, new AppendEntriesReply(
237                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
238
239         // The commit index should not get updated even though consensus was reached. This is b/c the
240         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
241         // from previous terms by counting replicas".
242         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
243
244         followerActor.underlyingActor().clear();
245
246         // Now replicate a new entry with the new term 2.
247         long newIndex = lastIndex + 1;
248         sendReplicate(actorContext, newTerm, newIndex);
249
250         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
251         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
252         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
253         assertEquals("Entries size", 1, appendEntries.getEntries().size());
254         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
255         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
256         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
257
258         // The follower replies with success. The leader should now update the commit index to the new index
259         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
260         // prior entries are committed indirectly".
261         leader.handleMessage(followerActor, new AppendEntriesReply(
262                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
263
264         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
265     }
266
267     @Test
268     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
269         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
270
271         MockRaftActorContext actorContext = createActorContextWithFollower();
272         actorContext.setRaftPolicy(createRaftPolicy(true, true));
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
291
292         // State should not change
293         assertTrue(raftBehavior instanceof Leader);
294
295         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
296         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
297         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
298         assertEquals("Entries size", 1, appendEntries.getEntries().size());
299         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
300         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
301         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
302         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
303     }
304
305     @Test
306     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
307         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
308
309         MockRaftActorContext actorContext = createActorContextWithFollower();
310         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
311             @Override
312             public FiniteDuration getHeartBeatInterval() {
313                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
314             }
315         });
316
317         long term = 1;
318         actorContext.getTermInformation().update(term, "");
319
320         leader = new Leader(actorContext);
321
322         // Leader will send an immediate heartbeat - ignore it.
323         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
324
325         // The follower would normally reply - simulate that explicitly here.
326         long lastIndex = actorContext.getReplicatedLog().lastIndex();
327         leader.handleMessage(followerActor, new AppendEntriesReply(
328                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
329         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
330
331         followerActor.underlyingActor().clear();
332
333         for (int i = 0; i < 5; i++) {
334             sendReplicate(actorContext, lastIndex + i + 1);
335         }
336
337         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
338         // We expect only 1 message to be sent because of two reasons,
339         // - an append entries reply was not received
340         // - the heartbeat interval has not expired
341         // In this scenario if multiple messages are sent they would likely be duplicates
342         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
343     }
344
345     @Test
346     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
347         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
348
349         MockRaftActorContext actorContext = createActorContextWithFollower();
350         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
351             @Override
352             public FiniteDuration getHeartBeatInterval() {
353                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
354             }
355         });
356
357         long term = 1;
358         actorContext.getTermInformation().update(term, "");
359
360         leader = new Leader(actorContext);
361
362         // Leader will send an immediate heartbeat - ignore it.
363         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
364
365         // The follower would normally reply - simulate that explicitly here.
366         long lastIndex = actorContext.getReplicatedLog().lastIndex();
367         leader.handleMessage(followerActor, new AppendEntriesReply(
368                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
369         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
370
371         followerActor.underlyingActor().clear();
372
373         for (int i = 0; i < 3; i++) {
374             sendReplicate(actorContext, lastIndex + i + 1);
375             leader.handleMessage(followerActor, new AppendEntriesReply(
376                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
377         }
378
379         // We are expecting six messages here -- a request to replicate and a consensus-reached message
380         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
381         assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
382         for (int i = 0; i < 3; i++) {
383             assertRequestEntry(lastIndex, allMessages, i);
384             assertCommitEntry(lastIndex, allMessages, i);
385         }
386
387         // Now perform another commit, eliciting a request to persist
388         sendReplicate(actorContext, lastIndex + 3 + 1);
389         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
390         // This elicits another message for request to replicate
391         assertEquals("The number of request entries collected", 7, allMessages.size());
392         assertRequestEntry(lastIndex, allMessages, 3);
393
394         sendReplicate(actorContext, lastIndex + 4 + 1);
395         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
396         assertEquals("The number of request entries collected", 7, allMessages.size());
397     }
398
399     private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
400             final int messageNr) {
401         final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
402         assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
403         assertEquals(List.of(), commitReq.getEntries());
404     }
405
406     private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
407             final int messageNr) {
408         final AppendEntries req = allMessages.get(2 * messageNr);
409         assertEquals(lastIndex + messageNr, req.getLeaderCommit());
410
411         final List<ReplicatedLogEntry> entries = req.getEntries();
412         assertEquals(1, entries.size());
413         assertEquals(messageNr + 2, entries.get(0).getIndex());
414     }
415
416     @Test
417     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
418         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
419
420         MockRaftActorContext actorContext = createActorContextWithFollower();
421         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
422             @Override
423             public FiniteDuration getHeartBeatInterval() {
424                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
425             }
426         });
427
428         long term = 1;
429         actorContext.getTermInformation().update(term, "");
430
431         leader = new Leader(actorContext);
432
433         // Leader will send an immediate heartbeat - ignore it.
434         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
435
436         // The follower would normally reply - simulate that explicitly here.
437         long lastIndex = actorContext.getReplicatedLog().lastIndex();
438         leader.handleMessage(followerActor, new AppendEntriesReply(
439                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
440         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
441
442         followerActor.underlyingActor().clear();
443
444         sendReplicate(actorContext, lastIndex + 1);
445
446         // Wait slightly longer than heartbeat duration
447         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
448
449         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
450
451         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
452         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
453
454         assertEquals(1, allMessages.get(0).getEntries().size());
455         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
456         assertEquals(1, allMessages.get(1).getEntries().size());
457         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
458
459     }
460
461     @Test
462     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
463         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
464
465         MockRaftActorContext actorContext = createActorContextWithFollower();
466         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
467             @Override
468             public FiniteDuration getHeartBeatInterval() {
469                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
470             }
471         });
472
473         long term = 1;
474         actorContext.getTermInformation().update(term, "");
475
476         leader = new Leader(actorContext);
477
478         // Leader will send an immediate heartbeat - ignore it.
479         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
480
481         // The follower would normally reply - simulate that explicitly here.
482         long lastIndex = actorContext.getReplicatedLog().lastIndex();
483         leader.handleMessage(followerActor, new AppendEntriesReply(
484                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
485         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
486
487         followerActor.underlyingActor().clear();
488
489         for (int i = 0; i < 3; i++) {
490             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
492         }
493
494         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
496     }
497
498     @Test
499     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
500         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
501
502         MockRaftActorContext actorContext = createActorContextWithFollower();
503         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
504             @Override
505             public FiniteDuration getHeartBeatInterval() {
506                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
507             }
508         });
509
510         long term = 1;
511         actorContext.getTermInformation().update(term, "");
512
513         leader = new Leader(actorContext);
514
515         // Leader will send an immediate heartbeat - ignore it.
516         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
517
518         // The follower would normally reply - simulate that explicitly here.
519         long lastIndex = actorContext.getReplicatedLog().lastIndex();
520         leader.handleMessage(followerActor, new AppendEntriesReply(
521                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
522         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
523
524         followerActor.underlyingActor().clear();
525
526         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
527         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
528         sendReplicate(actorContext, lastIndex + 1);
529
530         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
531         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
532
533         assertEquals(0, allMessages.get(0).getEntries().size());
534         assertEquals(1, allMessages.get(1).getEntries().size());
535     }
536
537
538     @Test
539     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
540         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
541
542         MockRaftActorContext actorContext = createActorContext();
543
544         leader = new Leader(actorContext);
545
546         actorContext.setLastApplied(0);
547
548         final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
549         final long term = actorContext.getTermInformation().getCurrentTerm();
550         final var data = new MockRaftActorContext.MockPayload("foo");
551
552         actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data));
553
554         final Identifier id = new MockIdentifier("state-id");
555         final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id));
556
557         // State should not change
558         assertTrue(raftBehavior instanceof Leader);
559
560         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
561
562         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
563         // one since lastApplied state is 0.
564         final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
565         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
566
567         for (int i = 0; i <= newLogIndex - 1; i++) {
568             ApplyState applyState = applyStateList.get(i);
569             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
570             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
571         }
572
573         ApplyState last = applyStateList.get((int) newLogIndex - 1);
574         assertEquals("getData", data, last.getReplicatedLogEntry().getData());
575         assertEquals("getIdentifier", id, last.getIdentifier());
576     }
577
578     @Test
579     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
580         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
581
582         final MockRaftActorContext actorContext = createActorContextWithFollower();
583
584         //clears leaders log
585         actorContext.getReplicatedLog().removeFrom(0);
586
587         final int commitIndex = 3;
588         final int snapshotIndex = 2;
589         final int snapshotTerm = 1;
590
591         // set the snapshot variables in replicatedlog
592         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
593         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
594         actorContext.setCommitIndex(commitIndex);
595         //set follower timeout to 2 mins, helps during debugging
596         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
597
598         leader = new Leader(actorContext);
599
600         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
601         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
602
603         //update follower timestamp
604         leader.markFollowerActive(FOLLOWER_ID);
605
606         ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
607         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
608                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
609                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
610         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
611                 actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
612         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
613         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
614
615         //send first chunk and no InstallSnapshotReply received yet
616         fts.getNextChunk();
617         fts.incrementChunkIndex();
618
619         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
620                 TimeUnit.MILLISECONDS);
621
622         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
623
624         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
625
626         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
627
628         //InstallSnapshotReply received
629         fts.markSendStatus(true);
630
631         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
632
633         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
634
635         assertEquals(commitIndex, is.getLastIncludedIndex());
636     }
637
638     @Test
639     public void testSendAppendEntriesSnapshotScenario() {
640         logStart("testSendAppendEntriesSnapshotScenario");
641
642         final MockRaftActorContext actorContext = createActorContextWithFollower();
643
644         Map<String, String> leadersSnapshot = new HashMap<>();
645         leadersSnapshot.put("1", "A");
646         leadersSnapshot.put("2", "B");
647         leadersSnapshot.put("3", "C");
648
649         //clears leaders log
650         actorContext.getReplicatedLog().removeFrom(0);
651
652         final int followersLastIndex = 2;
653         final int snapshotIndex = 3;
654         final int newEntryIndex = 4;
655         final int snapshotTerm = 1;
656         final int currentTerm = 2;
657
658         // set the snapshot variables in replicatedlog
659         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
660         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
661         actorContext.setCommitIndex(followersLastIndex);
662
663         leader = new Leader(actorContext);
664
665         // Leader will send an immediate heartbeat - ignore it.
666         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
667
668         // new entry
669         actorContext.getReplicatedLog().append(
670             new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
671
672         //update follower timestamp
673         leader.markFollowerActive(FOLLOWER_ID);
674
675         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
676         RaftActorBehavior raftBehavior = leader.handleMessage(
677                 leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
678
679         assertTrue(raftBehavior instanceof Leader);
680
681         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
682     }
683
684     @Test
685     public void testInitiateInstallSnapshot() {
686         logStart("testInitiateInstallSnapshot");
687
688         MockRaftActorContext actorContext = createActorContextWithFollower();
689
690         //clears leaders log
691         actorContext.getReplicatedLog().removeFrom(0);
692
693         final int followersLastIndex = 2;
694         final int snapshotIndex = 3;
695         final int newEntryIndex = 4;
696         final int snapshotTerm = 1;
697         final int currentTerm = 2;
698
699         // set the snapshot variables in replicatedlog
700         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
701         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
702         actorContext.setLastApplied(3);
703         actorContext.setCommitIndex(followersLastIndex);
704
705         leader = new Leader(actorContext);
706
707         // Leader will send an immediate heartbeat - ignore it.
708         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
709
710         // set the snapshot as absent and check if capture-snapshot is invoked.
711         leader.setSnapshotHolder(null);
712
713         // new entry
714         actorContext.getReplicatedLog().append(
715             new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
716
717         //update follower timestamp
718         leader.markFollowerActive(FOLLOWER_ID);
719
720         leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
721
722         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
723
724         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
725
726         assertEquals(3, cs.getLastAppliedIndex());
727         assertEquals(1, cs.getLastAppliedTerm());
728         assertEquals(4, cs.getLastIndex());
729         assertEquals(2, cs.getLastTerm());
730
731         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
732         leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
733
734         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
735     }
736
737     @Test
738     public void testInitiateForceInstallSnapshot() throws Exception {
739         logStart("testInitiateForceInstallSnapshot");
740
741         MockRaftActorContext actorContext = createActorContextWithFollower();
742
743         final int followersLastIndex = 2;
744         final int snapshotIndex = -1;
745         final int newEntryIndex = 4;
746         final int snapshotTerm = -1;
747         final int currentTerm = 2;
748
749         // set the snapshot variables in replicatedlog
750         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
751         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
752         actorContext.setLastApplied(3);
753         actorContext.setCommitIndex(followersLastIndex);
754
755         actorContext.getReplicatedLog().removeFrom(0);
756
757         AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
758         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
759
760         leader = new Leader(actorContext);
761         actorContext.setCurrentBehavior(leader);
762
763         // Leader will send an immediate heartbeat - ignore it.
764         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
765
766         // set the snapshot as absent and check if capture-snapshot is invoked.
767         leader.setSnapshotHolder(null);
768
769         for (int i = 0; i < 4; i++) {
770             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
771                     new MockRaftActorContext.MockPayload("X" + i)));
772         }
773
774         // new entry
775         actorContext.getReplicatedLog().append(
776             new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
777
778         //update follower timestamp
779         leader.markFollowerActive(FOLLOWER_ID);
780
781         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
782         // installed with a SendInstallSnapshot
783         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
784                 RaftVersions.CURRENT_VERSION));
785
786         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
787
788         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
789         assertEquals(3, cs.getLastAppliedIndex());
790         assertEquals(1, cs.getLastAppliedTerm());
791         assertEquals(4, cs.getLastIndex());
792         assertEquals(2, cs.getLastTerm());
793
794         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
795         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
796
797         MessageCollectorActor.clearMessages(followerActor);
798
799         // Sending Replicate message should not initiate another capture since the first is in progress.
800         leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
801         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
802
803         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
804         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
805                 RaftVersions.CURRENT_VERSION));
806         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
807
808         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
809         final byte[] bytes = new byte[]{1, 2, 3};
810         installSnapshotStream.get().orElseThrow().write(bytes);
811         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
812                 Runtime.getRuntime().totalMemory());
813         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
814
815         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
816         MessageCollectorActor.clearMessages(followerActor);
817         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
818                 RaftVersions.CURRENT_VERSION));
819         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
820     }
821
822
823     @Test
824     public void testInstallSnapshot() {
825         logStart("testInstallSnapshot");
826
827         final MockRaftActorContext actorContext = createActorContextWithFollower();
828
829         Map<String, String> leadersSnapshot = new HashMap<>();
830         leadersSnapshot.put("1", "A");
831         leadersSnapshot.put("2", "B");
832         leadersSnapshot.put("3", "C");
833
834         //clears leaders log
835         actorContext.getReplicatedLog().removeFrom(0);
836
837         final int lastAppliedIndex = 3;
838         final int snapshotIndex = 2;
839         final int snapshotTerm = 1;
840         final int currentTerm = 2;
841
842         // set the snapshot variables in replicatedlog
843         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
844         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
845         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
846         actorContext.setCommitIndex(lastAppliedIndex);
847         actorContext.setLastApplied(lastAppliedIndex);
848
849         leader = new Leader(actorContext);
850
851         // Initial heartbeat.
852         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
853
854         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
855         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
856
857         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
858         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
859                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
860
861         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
862                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
863
864         assertTrue(raftBehavior instanceof Leader);
865
866         // check if installsnapshot gets called with the correct values.
867
868         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
869                 InstallSnapshot.class);
870
871         assertNotNull(installSnapshot.getData());
872         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
873         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
874
875         assertEquals(currentTerm, installSnapshot.getTerm());
876     }
877
878     @Test
879     public void testForceInstallSnapshot() {
880         logStart("testForceInstallSnapshot");
881
882         final MockRaftActorContext actorContext = createActorContextWithFollower();
883
884         Map<String, String> leadersSnapshot = new HashMap<>();
885         leadersSnapshot.put("1", "A");
886         leadersSnapshot.put("2", "B");
887         leadersSnapshot.put("3", "C");
888
889         final int lastAppliedIndex = 3;
890         final int snapshotIndex = -1;
891         final int snapshotTerm = -1;
892         final int currentTerm = 2;
893
894         // set the snapshot variables in replicatedlog
895         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
896         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
897         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
898         actorContext.setCommitIndex(lastAppliedIndex);
899         actorContext.setLastApplied(lastAppliedIndex);
900
901         leader = new Leader(actorContext);
902
903         // Initial heartbeat.
904         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
905
906         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
907         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
908
909         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
910         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
911                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
912
913         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
914                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
915
916         assertTrue(raftBehavior instanceof Leader);
917
918         // check if installsnapshot gets called with the correct values.
919
920         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
921                 InstallSnapshot.class);
922
923         assertNotNull(installSnapshot.getData());
924         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
925         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
926
927         assertEquals(currentTerm, installSnapshot.getTerm());
928     }
929
930     @Test
931     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
932         logStart("testHandleInstallSnapshotReplyLastChunk");
933
934         MockRaftActorContext actorContext = createActorContextWithFollower();
935
936         final int commitIndex = 3;
937         final int snapshotIndex = 2;
938         final int snapshotTerm = 1;
939         final int currentTerm = 2;
940
941         actorContext.setCommitIndex(commitIndex);
942
943         leader = new Leader(actorContext);
944         actorContext.setCurrentBehavior(leader);
945
946         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
947         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
948
949         // Ignore initial heartbeat.
950         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
951
952         Map<String, String> leadersSnapshot = new HashMap<>();
953         leadersSnapshot.put("1", "A");
954         leadersSnapshot.put("2", "B");
955         leadersSnapshot.put("3", "C");
956
957         // set the snapshot variables in replicatedlog
958
959         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
960         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
961         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
962
963         ByteString bs = toByteString(leadersSnapshot);
964         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
965                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
966                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
967         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
968                 actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
969         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
970         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
971         while (!fts.isLastChunk(fts.getChunkIndex())) {
972             fts.getNextChunk();
973             fts.incrementChunkIndex();
974         }
975
976         //clears leaders log
977         actorContext.getReplicatedLog().removeFrom(0);
978
979         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
980                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
981
982         assertTrue(raftBehavior instanceof Leader);
983
984         assertEquals(1, leader.followerLogSize());
985         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
986         assertNotNull(fli);
987         assertNull(fli.getInstallSnapshotState());
988         assertEquals(commitIndex, fli.getMatchIndex());
989         assertEquals(commitIndex + 1, fli.getNextIndex());
990         assertFalse(leader.hasSnapshot());
991     }
992
993     @Test
994     public void testSendSnapshotfromInstallSnapshotReply() {
995         logStart("testSendSnapshotfromInstallSnapshotReply");
996
997         MockRaftActorContext actorContext = createActorContextWithFollower();
998
999         final int commitIndex = 3;
1000         final int snapshotIndex = 2;
1001         final int snapshotTerm = 1;
1002         final int currentTerm = 2;
1003
1004         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1005             @Override
1006             public int getMaximumMessageSliceSize() {
1007                 return 50;
1008             }
1009         };
1010         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1011         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1012
1013         actorContext.setConfigParams(configParams);
1014         actorContext.setCommitIndex(commitIndex);
1015
1016         leader = new Leader(actorContext);
1017         actorContext.setCurrentBehavior(leader);
1018
1019         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1020         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1021
1022         Map<String, String> leadersSnapshot = new HashMap<>();
1023         leadersSnapshot.put("1", "A");
1024         leadersSnapshot.put("2", "B");
1025         leadersSnapshot.put("3", "C");
1026
1027         // set the snapshot variables in replicatedlog
1028         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1029         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1030         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1031
1032         ByteString bs = toByteString(leadersSnapshot);
1033         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1034                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1035
1036         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1037
1038         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1039                 InstallSnapshot.class);
1040
1041         assertEquals(1, installSnapshot.getChunkIndex());
1042         assertEquals(3, installSnapshot.getTotalChunks());
1043
1044         followerActor.underlyingActor().clear();
1045         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1046                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1047
1048         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1049
1050         assertEquals(2, installSnapshot.getChunkIndex());
1051         assertEquals(3, installSnapshot.getTotalChunks());
1052
1053         followerActor.underlyingActor().clear();
1054         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1055                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1056
1057         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1058
1059         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1060         followerActor.underlyingActor().clear();
1061         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1062                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1063
1064         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1065
1066         assertNull(installSnapshot);
1067     }
1068
1069
1070     @Test
1071     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1072         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1073
1074         MockRaftActorContext actorContext = createActorContextWithFollower();
1075
1076         final int commitIndex = 3;
1077         final int snapshotIndex = 2;
1078         final int snapshotTerm = 1;
1079         final int currentTerm = 2;
1080
1081         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1082             @Override
1083             public int getMaximumMessageSliceSize() {
1084                 return 50;
1085             }
1086         });
1087
1088         actorContext.setCommitIndex(commitIndex);
1089
1090         leader = new Leader(actorContext);
1091
1092         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1093         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1094
1095         Map<String, String> leadersSnapshot = new HashMap<>();
1096         leadersSnapshot.put("1", "A");
1097         leadersSnapshot.put("2", "B");
1098         leadersSnapshot.put("3", "C");
1099
1100         // set the snapshot variables in replicatedlog
1101         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1102         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1103         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1104
1105         ByteString bs = toByteString(leadersSnapshot);
1106         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1107                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1108
1109         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1110         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1111
1112         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1113                 InstallSnapshot.class);
1114
1115         assertEquals(1, installSnapshot.getChunkIndex());
1116         assertEquals(3, installSnapshot.getTotalChunks());
1117
1118         followerActor.underlyingActor().clear();
1119
1120         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1121                 FOLLOWER_ID, -1, false));
1122
1123         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1124                 TimeUnit.MILLISECONDS);
1125
1126         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1127
1128         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1129
1130         assertEquals(1, installSnapshot.getChunkIndex());
1131         assertEquals(3, installSnapshot.getTotalChunks());
1132     }
1133
1134     @Test
1135     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1136         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1137
1138         MockRaftActorContext actorContext = createActorContextWithFollower();
1139
1140         final int commitIndex = 3;
1141         final int snapshotIndex = 2;
1142         final int snapshotTerm = 1;
1143         final int currentTerm = 2;
1144
1145         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1146             @Override
1147             public int getMaximumMessageSliceSize() {
1148                 return 50;
1149             }
1150         });
1151
1152         actorContext.setCommitIndex(commitIndex);
1153
1154         leader = new Leader(actorContext);
1155
1156         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1157         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1158
1159         Map<String, String> leadersSnapshot = new HashMap<>();
1160         leadersSnapshot.put("1", "A");
1161         leadersSnapshot.put("2", "B");
1162         leadersSnapshot.put("3", "C");
1163
1164         // set the snapshot variables in replicatedlog
1165         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1166         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1167         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1168
1169         ByteString bs = toByteString(leadersSnapshot);
1170         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1171                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1172
1173         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1174
1175         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1176                 InstallSnapshot.class);
1177
1178         assertEquals(1, installSnapshot.getChunkIndex());
1179         assertEquals(3, installSnapshot.getTotalChunks());
1180         assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
1181                 installSnapshot.getLastChunkHashCode());
1182
1183         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1184
1185         followerActor.underlyingActor().clear();
1186
1187         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1188                 FOLLOWER_ID, 1, true));
1189
1190         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1191
1192         assertEquals(2, installSnapshot.getChunkIndex());
1193         assertEquals(3, installSnapshot.getTotalChunks());
1194         assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
1195     }
1196
1197     @Test
1198     public void testLeaderInstallSnapshotState() throws IOException {
1199         logStart("testLeaderInstallSnapshotState");
1200
1201         Map<String, String> leadersSnapshot = new HashMap<>();
1202         leadersSnapshot.put("1", "A");
1203         leadersSnapshot.put("2", "B");
1204         leadersSnapshot.put("3", "C");
1205
1206         ByteString bs = toByteString(leadersSnapshot);
1207         byte[] barray = bs.toByteArray();
1208
1209         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1210         fts.setSnapshotBytes(ByteSource.wrap(barray));
1211
1212         assertEquals(bs.size(), barray.length);
1213
1214         int chunkIndex = 0;
1215         for (int i = 0; i < barray.length; i = i + 50) {
1216             int length = i + 50;
1217             chunkIndex++;
1218
1219             if (i + 50 > barray.length) {
1220                 length = barray.length;
1221             }
1222
1223             byte[] chunk = fts.getNextChunk();
1224             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1225             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1226
1227             fts.markSendStatus(true);
1228             if (!fts.isLastChunk(chunkIndex)) {
1229                 fts.incrementChunkIndex();
1230             }
1231         }
1232
1233         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1234         fts.close();
1235     }
1236
1237     @Override
1238     protected Leader createBehavior(final RaftActorContext actorContext) {
1239         return new Leader(actorContext);
1240     }
1241
1242     @Override
1243     protected MockRaftActorContext createActorContext() {
1244         return createActorContext(leaderActor);
1245     }
1246
1247     @Override
1248     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1249         return createActorContext(LEADER_ID, actorRef);
1250     }
1251
1252     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1253         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1254         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1255         configParams.setElectionTimeoutFactor(100000);
1256         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1257         context.setConfigParams(configParams);
1258         context.setPayloadVersion(payloadVersion);
1259         return context;
1260     }
1261
1262     private MockRaftActorContext createActorContextWithFollower() {
1263         MockRaftActorContext actorContext = createActorContext();
1264         actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
1265         return actorContext;
1266     }
1267
1268     private MockRaftActorContext createFollowerActorContextWithLeader() {
1269         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1270         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1271         followerConfig.setElectionTimeoutFactor(10000);
1272         followerActorContext.setConfigParams(followerConfig);
1273         followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1274         return followerActorContext;
1275     }
1276
1277     @Test
1278     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1279         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1280
1281         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1282
1283         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1284
1285         Follower follower = new Follower(followerActorContext);
1286         followerActor.underlyingActor().setBehavior(follower);
1287         followerActorContext.setCurrentBehavior(follower);
1288
1289         Map<String, String> peerAddresses = new HashMap<>();
1290         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1291
1292         leaderActorContext.setPeerAddresses(peerAddresses);
1293
1294         leaderActorContext.getReplicatedLog().removeFrom(0);
1295
1296         //create 3 entries
1297         leaderActorContext.setReplicatedLog(
1298                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1299
1300         leaderActorContext.setCommitIndex(1);
1301
1302         followerActorContext.getReplicatedLog().removeFrom(0);
1303
1304         // follower too has the exact same log entries and has the same commit index
1305         followerActorContext.setReplicatedLog(
1306                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1307
1308         followerActorContext.setCommitIndex(1);
1309
1310         leader = new Leader(leaderActorContext);
1311         leaderActorContext.setCurrentBehavior(leader);
1312
1313         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1314
1315         assertEquals(-1, appendEntries.getLeaderCommit());
1316         assertEquals(0, appendEntries.getEntries().size());
1317         assertEquals(0, appendEntries.getPrevLogIndex());
1318
1319         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1320                 leaderActor, AppendEntriesReply.class);
1321
1322         assertEquals(2, appendEntriesReply.getLogLastIndex());
1323         assertEquals(1, appendEntriesReply.getLogLastTerm());
1324
1325         // follower returns its next index
1326         assertEquals(2, appendEntriesReply.getLogLastIndex());
1327         assertEquals(1, appendEntriesReply.getLogLastTerm());
1328
1329         follower.close();
1330     }
1331
1332     @Test
1333     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1334         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1335
1336         final MockRaftActorContext leaderActorContext = createActorContext();
1337
1338         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1339         followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1340
1341         Follower follower = new Follower(followerActorContext);
1342         followerActor.underlyingActor().setBehavior(follower);
1343         followerActorContext.setCurrentBehavior(follower);
1344
1345         Map<String, String> leaderPeerAddresses = new HashMap<>();
1346         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1347
1348         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1349
1350         leaderActorContext.getReplicatedLog().removeFrom(0);
1351
1352         leaderActorContext.setReplicatedLog(
1353                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1354
1355         leaderActorContext.setCommitIndex(1);
1356
1357         followerActorContext.getReplicatedLog().removeFrom(0);
1358
1359         followerActorContext.setReplicatedLog(
1360                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1361
1362         // follower has the same log entries but its commit index > leaders commit index
1363         followerActorContext.setCommitIndex(2);
1364
1365         leader = new Leader(leaderActorContext);
1366
1367         // Initial heartbeat
1368         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1369
1370         assertEquals(-1, appendEntries.getLeaderCommit());
1371         assertEquals(0, appendEntries.getEntries().size());
1372         assertEquals(0, appendEntries.getPrevLogIndex());
1373
1374         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1375                 leaderActor, AppendEntriesReply.class);
1376
1377         assertEquals(2, appendEntriesReply.getLogLastIndex());
1378         assertEquals(1, appendEntriesReply.getLogLastTerm());
1379
1380         leaderActor.underlyingActor().setBehavior(follower);
1381         leader.handleMessage(followerActor, appendEntriesReply);
1382
1383         leaderActor.underlyingActor().clear();
1384         followerActor.underlyingActor().clear();
1385
1386         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1387                 TimeUnit.MILLISECONDS);
1388
1389         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1390
1391         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392
1393         assertEquals(2, appendEntries.getLeaderCommit());
1394         assertEquals(0, appendEntries.getEntries().size());
1395         assertEquals(2, appendEntries.getPrevLogIndex());
1396
1397         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1398
1399         assertEquals(2, appendEntriesReply.getLogLastIndex());
1400         assertEquals(1, appendEntriesReply.getLogLastTerm());
1401
1402         assertEquals(2, followerActorContext.getCommitIndex());
1403
1404         follower.close();
1405     }
1406
1407     @Test
1408     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1409         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1410
1411         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1412         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1413                 new FiniteDuration(1000, TimeUnit.SECONDS));
1414
1415         leaderActorContext.setReplicatedLog(
1416                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1417         long leaderCommitIndex = 2;
1418         leaderActorContext.setCommitIndex(leaderCommitIndex);
1419         leaderActorContext.setLastApplied(leaderCommitIndex);
1420
1421         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1422         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1423
1424         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1425
1426         followerActorContext.setReplicatedLog(
1427                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1428         followerActorContext.setCommitIndex(0);
1429         followerActorContext.setLastApplied(0);
1430
1431         Follower follower = new Follower(followerActorContext);
1432         followerActor.underlyingActor().setBehavior(follower);
1433
1434         leader = new Leader(leaderActorContext);
1435
1436         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1437         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1438                 AppendEntriesReply.class);
1439
1440         MessageCollectorActor.clearMessages(followerActor);
1441         MessageCollectorActor.clearMessages(leaderActor);
1442
1443         // Verify initial AppendEntries sent.
1444         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1445         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1446         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1447
1448         leaderActor.underlyingActor().setBehavior(leader);
1449
1450         leader.handleMessage(followerActor, appendEntriesReply);
1451
1452         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1453         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1454
1455         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1456         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1457         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1458
1459         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1460         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1461                 appendEntries.getEntries().get(0).getData());
1462         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1463         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1464                 appendEntries.getEntries().get(1).getData());
1465
1466         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1467         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1468
1469         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1470
1471         ApplyState applyState = applyStateList.get(0);
1472         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1473         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1474         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1475                 applyState.getReplicatedLogEntry().getData());
1476
1477         applyState = applyStateList.get(1);
1478         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1479         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1480         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1481                 applyState.getReplicatedLogEntry().getData());
1482
1483         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1484         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1485     }
1486
1487     @Test
1488     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1489         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1490
1491         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1492         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1493                 new FiniteDuration(1000, TimeUnit.SECONDS));
1494
1495         leaderActorContext.setReplicatedLog(
1496                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1497         long leaderCommitIndex = 1;
1498         leaderActorContext.setCommitIndex(leaderCommitIndex);
1499         leaderActorContext.setLastApplied(leaderCommitIndex);
1500
1501         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1502         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1503
1504         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1505
1506         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1507         followerActorContext.setCommitIndex(-1);
1508         followerActorContext.setLastApplied(-1);
1509
1510         Follower follower = new Follower(followerActorContext);
1511         followerActor.underlyingActor().setBehavior(follower);
1512         followerActorContext.setCurrentBehavior(follower);
1513
1514         leader = new Leader(leaderActorContext);
1515
1516         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1517         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1518                 AppendEntriesReply.class);
1519
1520         MessageCollectorActor.clearMessages(followerActor);
1521         MessageCollectorActor.clearMessages(leaderActor);
1522
1523         // Verify initial AppendEntries sent with the leader's current commit index.
1524         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1525         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1526         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1527
1528         leaderActor.underlyingActor().setBehavior(leader);
1529         leaderActorContext.setCurrentBehavior(leader);
1530
1531         leader.handleMessage(followerActor, appendEntriesReply);
1532
1533         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1534         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1535
1536         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1537         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1538         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1539
1540         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1541         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1542                 appendEntries.getEntries().get(0).getData());
1543         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1544         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1545                 appendEntries.getEntries().get(1).getData());
1546
1547         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1548         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1549
1550         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1551
1552         ApplyState applyState = applyStateList.get(0);
1553         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1554         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1555         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1556                 applyState.getReplicatedLogEntry().getData());
1557
1558         applyState = applyStateList.get(1);
1559         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1560         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1561         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1562                 applyState.getReplicatedLogEntry().getData());
1563
1564         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1565         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1566     }
1567
1568     @Test
1569     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1570         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1571
1572         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1573         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1574                 new FiniteDuration(1000, TimeUnit.SECONDS));
1575
1576         leaderActorContext.setReplicatedLog(
1577                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1578         long leaderCommitIndex = 1;
1579         leaderActorContext.setCommitIndex(leaderCommitIndex);
1580         leaderActorContext.setLastApplied(leaderCommitIndex);
1581
1582         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1583         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1584
1585         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1586
1587         followerActorContext.setReplicatedLog(
1588                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1589         followerActorContext.setCommitIndex(-1);
1590         followerActorContext.setLastApplied(-1);
1591
1592         Follower follower = new Follower(followerActorContext);
1593         followerActor.underlyingActor().setBehavior(follower);
1594         followerActorContext.setCurrentBehavior(follower);
1595
1596         leader = new Leader(leaderActorContext);
1597
1598         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1599         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1600                 AppendEntriesReply.class);
1601
1602         MessageCollectorActor.clearMessages(followerActor);
1603         MessageCollectorActor.clearMessages(leaderActor);
1604
1605         // Verify initial AppendEntries sent with the leader's current commit index.
1606         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1607         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1608         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1609
1610         leaderActor.underlyingActor().setBehavior(leader);
1611         leaderActorContext.setCurrentBehavior(leader);
1612
1613         leader.handleMessage(followerActor, appendEntriesReply);
1614
1615         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1616         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1617
1618         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1619         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1620         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1621
1622         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1623         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1624         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1625                 appendEntries.getEntries().get(0).getData());
1626         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1627         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1628         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1629                 appendEntries.getEntries().get(1).getData());
1630
1631         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1632         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1633
1634         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1635
1636         ApplyState applyState = applyStateList.get(0);
1637         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1638         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1639         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1640                 applyState.getReplicatedLogEntry().getData());
1641
1642         applyState = applyStateList.get(1);
1643         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1644         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1645         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1646                 applyState.getReplicatedLogEntry().getData());
1647
1648         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1649         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1650         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1651     }
1652
1653     @Test
1654     public void testHandleAppendEntriesReplyWithNewerTerm() {
1655         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1656
1657         MockRaftActorContext leaderActorContext = createActorContext();
1658         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1659                 new FiniteDuration(10000, TimeUnit.SECONDS));
1660
1661         leaderActorContext.setReplicatedLog(
1662                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1663
1664         leader = new Leader(leaderActorContext);
1665         leaderActor.underlyingActor().setBehavior(leader);
1666         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1667
1668         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1669                 AppendEntriesReply.class);
1670
1671         assertEquals(false, appendEntriesReply.isSuccess());
1672         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1673
1674         MessageCollectorActor.clearMessages(leaderActor);
1675     }
1676
1677     @Test
1678     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1679         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1680
1681         MockRaftActorContext leaderActorContext = createActorContext();
1682         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1683                 new FiniteDuration(10000, TimeUnit.SECONDS));
1684
1685         leaderActorContext.setReplicatedLog(
1686                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1687         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1688
1689         leader = new Leader(leaderActorContext);
1690         leaderActor.underlyingActor().setBehavior(leader);
1691         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1692
1693         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1694                 AppendEntriesReply.class);
1695
1696         assertEquals(false, appendEntriesReply.isSuccess());
1697         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1698
1699         MessageCollectorActor.clearMessages(leaderActor);
1700     }
1701
1702     @Test
1703     public void testHandleAppendEntriesReplySuccess() {
1704         logStart("testHandleAppendEntriesReplySuccess");
1705
1706         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1707
1708         leaderActorContext.setReplicatedLog(
1709                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1710
1711         leaderActorContext.setCommitIndex(1);
1712         leaderActorContext.setLastApplied(1);
1713         leaderActorContext.getTermInformation().update(1, "leader");
1714
1715         leader = new Leader(leaderActorContext);
1716
1717         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1718
1719         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1720         assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
1721
1722         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1723
1724         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1725
1726         assertEquals(RaftState.Leader, raftActorBehavior.state());
1727
1728         assertEquals(2, leaderActorContext.getCommitIndex());
1729
1730         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1731                 leaderActor, ApplyJournalEntries.class);
1732
1733         assertEquals(2, leaderActorContext.getLastApplied());
1734
1735         assertEquals(2, applyJournalEntries.getToIndex());
1736
1737         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1738                 ApplyState.class);
1739
1740         assertEquals(1,applyStateList.size());
1741
1742         ApplyState applyState = applyStateList.get(0);
1743
1744         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1745
1746         assertEquals(2, followerInfo.getMatchIndex());
1747         assertEquals(3, followerInfo.getNextIndex());
1748         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1749         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1750     }
1751
1752     @Test
1753     public void testHandleAppendEntriesReplyUnknownFollower() {
1754         logStart("testHandleAppendEntriesReplyUnknownFollower");
1755
1756         MockRaftActorContext leaderActorContext = createActorContext();
1757
1758         leader = new Leader(leaderActorContext);
1759
1760         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1761
1762         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1763
1764         assertEquals(RaftState.Leader, raftActorBehavior.state());
1765     }
1766
1767     @Test
1768     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1769         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1770
1771         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1772         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1773                 new FiniteDuration(1000, TimeUnit.SECONDS));
1774         // Note: the size here depends on estimate
1775         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(246);
1776
1777         leaderActorContext.setReplicatedLog(
1778                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1779         long leaderCommitIndex = 3;
1780         leaderActorContext.setCommitIndex(leaderCommitIndex);
1781         leaderActorContext.setLastApplied(leaderCommitIndex);
1782
1783         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1784         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1785         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1786         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1787
1788         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1789
1790         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1791         followerActorContext.setCommitIndex(-1);
1792         followerActorContext.setLastApplied(-1);
1793
1794         Follower follower = new Follower(followerActorContext);
1795         followerActor.underlyingActor().setBehavior(follower);
1796         followerActorContext.setCurrentBehavior(follower);
1797
1798         leader = new Leader(leaderActorContext);
1799
1800         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1801         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1802                 AppendEntriesReply.class);
1803
1804         MessageCollectorActor.clearMessages(followerActor);
1805         MessageCollectorActor.clearMessages(leaderActor);
1806
1807         // Verify initial AppendEntries sent with the leader's current commit index.
1808         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1809         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1810         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1811
1812         leaderActor.underlyingActor().setBehavior(leader);
1813         leaderActorContext.setCurrentBehavior(leader);
1814
1815         leader.handleMessage(followerActor, appendEntriesReply);
1816
1817         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1818                 AppendEntries.class, 2);
1819         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1820
1821         appendEntries = appendEntriesList.get(0);
1822         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1823         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1824         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1825
1826         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1827         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1828                 appendEntries.getEntries().get(0).getData());
1829         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1830         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1831                 appendEntries.getEntries().get(1).getData());
1832
1833         appendEntries = appendEntriesList.get(1);
1834         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1835         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1836         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1837
1838         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1839         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1840                 appendEntries.getEntries().get(0).getData());
1841         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1842         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1843                 appendEntries.getEntries().get(1).getData());
1844
1845         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1846         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1847
1848         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1849
1850         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1851         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1852     }
1853
1854     @Test
1855     public void testHandleRequestVoteReply() {
1856         logStart("testHandleRequestVoteReply");
1857
1858         MockRaftActorContext leaderActorContext = createActorContext();
1859
1860         leader = new Leader(leaderActorContext);
1861
1862         // Should be a no-op.
1863         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1864                 new RequestVoteReply(1, true));
1865
1866         assertEquals(RaftState.Leader, raftActorBehavior.state());
1867
1868         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1869
1870         assertEquals(RaftState.Leader, raftActorBehavior.state());
1871     }
1872
1873     @Test
1874     public void testIsolatedLeaderCheckNoFollowers() {
1875         logStart("testIsolatedLeaderCheckNoFollowers");
1876
1877         MockRaftActorContext leaderActorContext = createActorContext();
1878
1879         leader = new Leader(leaderActorContext);
1880         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1881         assertTrue(newBehavior instanceof Leader);
1882     }
1883
1884     @Test
1885     public void testIsolatedLeaderCheckNoVotingFollowers() {
1886         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1887
1888         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1889         Follower follower = new Follower(followerActorContext);
1890         followerActor.underlyingActor().setBehavior(follower);
1891
1892         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1893         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1894                 new FiniteDuration(1000, TimeUnit.SECONDS));
1895         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1896
1897         leader = new Leader(leaderActorContext);
1898         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1899         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1900         assertTrue("Expected Leader", newBehavior instanceof Leader);
1901     }
1902
1903     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1904         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1905         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1906
1907         MockRaftActorContext leaderActorContext = createActorContext();
1908
1909         Map<String, String> peerAddresses = new HashMap<>();
1910         peerAddresses.put("follower-1", followerActor1.path().toString());
1911         peerAddresses.put("follower-2", followerActor2.path().toString());
1912
1913         leaderActorContext.setPeerAddresses(peerAddresses);
1914         leaderActorContext.setRaftPolicy(raftPolicy);
1915
1916         leader = new Leader(leaderActorContext);
1917
1918         leader.markFollowerActive("follower-1");
1919         leader.markFollowerActive("follower-2");
1920         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1921         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1922
1923         // kill 1 follower and verify if that got killed
1924         final TestKit probe = new TestKit(getSystem());
1925         probe.watch(followerActor1);
1926         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1927         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1928         assertEquals(termMsg1.getActor(), followerActor1);
1929
1930         leader.markFollowerInActive("follower-1");
1931         leader.markFollowerActive("follower-2");
1932         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1933         assertTrue("Behavior not instance of Leader when majority of followers are active",
1934                 newBehavior instanceof Leader);
1935
1936         // kill 2nd follower and leader should change to Isolated leader
1937         followerActor2.tell(PoisonPill.getInstance(), null);
1938         probe.watch(followerActor2);
1939         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1940         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1941         assertEquals(termMsg2.getActor(), followerActor2);
1942
1943         leader.markFollowerInActive("follower-2");
1944         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1945     }
1946
1947     @Test
1948     public void testIsolatedLeaderCheckTwoFollowers() {
1949         logStart("testIsolatedLeaderCheckTwoFollowers");
1950
1951         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1952
1953         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1954             newBehavior instanceof IsolatedLeader);
1955     }
1956
1957     @Test
1958     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1959         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1960
1961         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1962
1963         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1964                 newBehavior instanceof Leader);
1965     }
1966
1967     @Test
1968     public void testLaggingFollowerStarvation() {
1969         logStart("testLaggingFollowerStarvation");
1970
1971         String leaderActorId = actorFactory.generateActorId("leader");
1972         String follower1ActorId = actorFactory.generateActorId("follower");
1973         String follower2ActorId = actorFactory.generateActorId("follower");
1974
1975         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1976         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1977
1978         MockRaftActorContext leaderActorContext =
1979                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1980
1981         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1982         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1983         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1984
1985         leaderActorContext.setConfigParams(configParams);
1986
1987         leaderActorContext.setReplicatedLog(
1988                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1989
1990         Map<String, String> peerAddresses = new HashMap<>();
1991         peerAddresses.put(follower1ActorId,
1992                 follower1Actor.path().toString());
1993         peerAddresses.put(follower2ActorId,
1994                 follower2Actor.path().toString());
1995
1996         leaderActorContext.setPeerAddresses(peerAddresses);
1997         leaderActorContext.getTermInformation().update(1, leaderActorId);
1998
1999         leader = createBehavior(leaderActorContext);
2000
2001         leaderActor.underlyingActor().setBehavior(leader);
2002
2003         for (int i = 1; i < 6; i++) {
2004             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2005             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2006                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2007             assertTrue(newBehavior == leader);
2008             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2009         }
2010
2011         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2012         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2013
2014         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2015                 heartbeats.size() > 1);
2016
2017         // Check if follower-2 got AppendEntries during this time and was not starved
2018         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2019
2020         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2021                 appendEntries.size() > 1);
2022     }
2023
2024     @Test
2025     public void testReplicationConsensusWithNonVotingFollower() {
2026         logStart("testReplicationConsensusWithNonVotingFollower");
2027
2028         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2029         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2030                 new FiniteDuration(1000, TimeUnit.SECONDS));
2031
2032         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2033         leaderActorContext.setCommitIndex(-1);
2034         leaderActorContext.setLastApplied(-1);
2035
2036         String nonVotingFollowerId = "nonvoting-follower";
2037         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2038                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2039
2040         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2041                 VotingState.NON_VOTING);
2042
2043         leader = new Leader(leaderActorContext);
2044         leaderActorContext.setCurrentBehavior(leader);
2045
2046         // Ignore initial heartbeats
2047         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2048         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2049
2050         MessageCollectorActor.clearMessages(followerActor);
2051         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2052         MessageCollectorActor.clearMessages(leaderActor);
2053
2054         // Send a Replicate message and wait for AppendEntries.
2055         sendReplicate(leaderActorContext, 0);
2056
2057         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2058         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2059
2060         // Send reply only from the voting follower and verify consensus via ApplyState.
2061         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2062
2063         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2064
2065         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2066
2067         MessageCollectorActor.clearMessages(followerActor);
2068         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2069         MessageCollectorActor.clearMessages(leaderActor);
2070
2071         // Send another Replicate message
2072         sendReplicate(leaderActorContext, 1);
2073
2074         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2075         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2076                 AppendEntries.class);
2077         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2078         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2079
2080         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2081         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2082
2083         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2084
2085         // Send reply from the voting follower and verify consensus.
2086         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2087
2088         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2089     }
2090
2091     @Test
2092     public void testTransferLeadershipWithFollowerInSync() {
2093         logStart("testTransferLeadershipWithFollowerInSync");
2094
2095         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2096         leaderActorContext.setLastApplied(-1);
2097         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2098                 new FiniteDuration(1000, TimeUnit.SECONDS));
2099         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2100
2101         leader = new Leader(leaderActorContext);
2102         leaderActorContext.setCurrentBehavior(leader);
2103
2104         // Initial heartbeat
2105         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2106         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2107         MessageCollectorActor.clearMessages(followerActor);
2108
2109         sendReplicate(leaderActorContext, 0);
2110         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2111
2112         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2113         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2114         MessageCollectorActor.clearMessages(followerActor);
2115
2116         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2117         leader.transferLeadership(mockTransferCohort);
2118
2119         verify(mockTransferCohort, never()).transferComplete();
2120         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2121         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2122         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2123
2124         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2125         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2126
2127         // Leader should force an election timeout
2128         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2129
2130         verify(mockTransferCohort).transferComplete();
2131     }
2132
2133     @Test
2134     public void testTransferLeadershipWithEmptyLog() {
2135         logStart("testTransferLeadershipWithEmptyLog");
2136
2137         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2138         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2139                 new FiniteDuration(1000, TimeUnit.SECONDS));
2140         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2141
2142         leader = new Leader(leaderActorContext);
2143         leaderActorContext.setCurrentBehavior(leader);
2144
2145         // Initial heartbeat
2146         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2147         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2148         MessageCollectorActor.clearMessages(followerActor);
2149
2150         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2151         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2152         leader.transferLeadership(mockTransferCohort);
2153
2154         verify(mockTransferCohort, never()).transferComplete();
2155         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2156         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2157
2158         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2159         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2160
2161         // Leader should force an election timeout
2162         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2163
2164         verify(mockTransferCohort).transferComplete();
2165     }
2166
2167     @Test
2168     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2169         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2170
2171         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2172         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2173                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2174
2175         leader = new Leader(leaderActorContext);
2176         leaderActorContext.setCurrentBehavior(leader);
2177
2178         // Initial heartbeat
2179         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2180         MessageCollectorActor.clearMessages(followerActor);
2181
2182         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2183         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2184         leader.transferLeadership(mockTransferCohort);
2185
2186         verify(mockTransferCohort, never()).transferComplete();
2187
2188         // Sync up the follower.
2189         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2190         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2191         MessageCollectorActor.clearMessages(followerActor);
2192
2193         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2194                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2195         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2196         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2197         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2198
2199         // Leader should force an election timeout
2200         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2201
2202         verify(mockTransferCohort).transferComplete();
2203     }
2204
2205     @Test
2206     public void testTransferLeadershipWithFollowerSyncTimeout() {
2207         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2208
2209         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2210         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2211                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2212         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2213         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2214
2215         leader = new Leader(leaderActorContext);
2216         leaderActorContext.setCurrentBehavior(leader);
2217
2218         // Initial heartbeat
2219         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2220         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2221         MessageCollectorActor.clearMessages(followerActor);
2222
2223         sendReplicate(leaderActorContext, 0);
2224         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2225
2226         MessageCollectorActor.clearMessages(followerActor);
2227
2228         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2229         leader.transferLeadership(mockTransferCohort);
2230
2231         verify(mockTransferCohort, never()).transferComplete();
2232
2233         // Send heartbeats to time out the transfer.
2234         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2235             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2236                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2237             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2238         }
2239
2240         verify(mockTransferCohort).abortTransfer();
2241         verify(mockTransferCohort, never()).transferComplete();
2242         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2243     }
2244
2245     @Test
2246     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2247         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2248
2249         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2250                 List.of(new SimpleReplicatedLogEntry(0, 1,
2251                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2252         final MockRaftActorContext.MockPayload largePayload =
2253                 new MockRaftActorContext.MockPayload("large", serializedSize);
2254
2255         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2256         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2257                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2258         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(serializedSize - 50);
2259         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2260         leaderActorContext.setCommitIndex(-1);
2261         leaderActorContext.setLastApplied(-1);
2262
2263         leader = new Leader(leaderActorContext);
2264         leaderActorContext.setCurrentBehavior(leader);
2265
2266         // Send initial heartbeat reply so follower is marked active
2267         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2268         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2269         MessageCollectorActor.clearMessages(followerActor);
2270
2271         // Send normal payload first to prime commit index.
2272         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2273         sendReplicate(leaderActorContext, term, 0);
2274
2275         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2276         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2277         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2278
2279         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2280         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2281         MessageCollectorActor.clearMessages(followerActor);
2282
2283         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2284         sendReplicate(leaderActorContext, term, 1, largePayload);
2285
2286         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2287         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2288         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2289
2290         final Identifier slicingId = messageSlice.getIdentifier();
2291
2292         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2293         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2294         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2295         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2296         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2297         MessageCollectorActor.clearMessages(followerActor);
2298
2299         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2300
2301         // Sleep for the heartbeat interval so AppendEntries is sent.
2302         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2303                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2304
2305         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2306
2307         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2308         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2309         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2310         MessageCollectorActor.clearMessages(followerActor);
2311
2312         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2313
2314         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2315         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2316         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2317
2318         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2319
2320         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2321
2322         MessageCollectorActor.clearMessages(followerActor);
2323
2324         // Send another normal payload.
2325
2326         sendReplicate(leaderActorContext, term, 2);
2327
2328         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2329         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2330         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2331         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2332     }
2333
2334     @Test
2335     public void testLargePayloadSlicingExpiration() {
2336         logStart("testLargePayloadSlicingExpiration");
2337
2338         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2339         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2340                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2341         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2342         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(10);
2343         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2344         leaderActorContext.setCommitIndex(-1);
2345         leaderActorContext.setLastApplied(-1);
2346
2347         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2348         leader = new Leader(leaderActorContext);
2349         leaderActorContext.setCurrentBehavior(leader);
2350
2351         // Send initial heartbeat reply so follower is marked active
2352         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2353         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2354         MessageCollectorActor.clearMessages(followerActor);
2355
2356         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2357                 leaderActorContext.getConfigParams().getMaximumMessageSliceSize() + 1));
2358         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2359
2360         // Sleep for at least 3 * election timeout so the slicing state expires.
2361         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2362                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2363         MessageCollectorActor.clearMessages(followerActor);
2364
2365         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2366
2367         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2368         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2369         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2370
2371         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2372         MessageCollectorActor.clearMessages(followerActor);
2373
2374         // Send an AppendEntriesReply - this should restart the slicing.
2375
2376         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2377                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2378
2379         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2380
2381         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2382     }
2383
2384     @Test
2385     public void testLeaderAddressInAppendEntries() {
2386         logStart("testLeaderAddressInAppendEntries");
2387
2388         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2389         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2390                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2391         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2392         leaderActorContext.setCommitIndex(-1);
2393         leaderActorContext.setLastApplied(-1);
2394
2395         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2396             peerId -> leaderActor.path().toString());
2397
2398         leader = new Leader(leaderActorContext);
2399         leaderActorContext.setCurrentBehavior(leader);
2400
2401         // Initial heartbeat shouldn't have the leader address
2402
2403         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2404         assertNull(appendEntries.leaderAddress());
2405         MessageCollectorActor.clearMessages(followerActor);
2406
2407         // Send AppendEntriesReply indicating the follower needs the leader address
2408
2409         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2410                 RaftVersions.CURRENT_VERSION));
2411
2412         // Sleep for the heartbeat interval so AppendEntries is sent.
2413         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2414                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2415
2416         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2417
2418         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2419         assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress());
2420         MessageCollectorActor.clearMessages(followerActor);
2421
2422         // Send AppendEntriesReply indicating the follower does not need the leader address
2423
2424         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2425                 RaftVersions.CURRENT_VERSION));
2426
2427         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2428                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2429
2430         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2431
2432         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2433         assertNull(appendEntries.leaderAddress());
2434     }
2435
2436     @Override
2437     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2438             final ActorRef actorRef, final RaftRPC rpc) {
2439         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2440         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2441     }
2442
2443     private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2444
2445         private final long electionTimeOutIntervalMillis;
2446         private final int maximumMessageSliceSize;
2447
2448         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int maximumMessageSliceSize) {
2449             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2450             this.maximumMessageSliceSize = maximumMessageSliceSize;
2451         }
2452
2453         @Override
2454         public FiniteDuration getElectionTimeOutInterval() {
2455             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2456         }
2457
2458         @Override
2459         public int getMaximumMessageSliceSize() {
2460             return maximumMessageSliceSize;
2461         }
2462     }
2463 }