Remove support for pre-Fluorine versions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.OptionalInt;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.messaging.MessageSlice;
44 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
47 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
50 import org.opendaylight.controller.cluster.raft.RaftState;
51 import org.opendaylight.controller.cluster.raft.RaftVersions;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.VotingState;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
56 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
57 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
59 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
61 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
65 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.messages.Payload;
67 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
71 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
72 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
73 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
74 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
75 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import org.opendaylight.yangtools.concepts.Identifier;
78 import scala.concurrent.duration.FiniteDuration;
79
80 public class LeaderTest extends AbstractLeaderTest<Leader> {
81
82     static final String FOLLOWER_ID = "follower";
83     public static final String LEADER_ID = "leader";
84
85     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
86             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
87
88     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
89             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
90
91     private Leader leader;
92     private final short payloadVersion = 5;
93
94     @Override
95     @After
96     public void tearDown() {
97         if (leader != null) {
98             leader.close();
99         }
100
101         super.tearDown();
102     }
103
104     @Test
105     public void testHandleMessageForUnknownMessage() {
106         logStart("testHandleMessageForUnknownMessage");
107
108         leader = new Leader(createActorContext());
109
110         // handle message should null when it receives an unknown message
111         assertNull(leader.handleMessage(followerActor, "foo"));
112     }
113
114     @Test
115     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
116         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
117
118         MockRaftActorContext actorContext = createActorContextWithFollower();
119         actorContext.setCommitIndex(-1);
120         actorContext.setPayloadVersion(payloadVersion);
121
122         long term = 1;
123         actorContext.getTermInformation().update(term, "");
124
125         leader = new Leader(actorContext);
126         actorContext.setCurrentBehavior(leader);
127
128         // Leader should send an immediate heartbeat with no entries as follower is inactive.
129         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
130         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
131         assertEquals("getTerm", term, appendEntries.getTerm());
132         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
133         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
134         assertEquals("Entries size", 0, appendEntries.getEntries().size());
135         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
136
137         // The follower would normally reply - simulate that explicitly here.
138         leader.handleMessage(followerActor, new AppendEntriesReply(
139                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
140         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
141
142         followerActor.underlyingActor().clear();
143
144         // Sleep for the heartbeat interval so AppendEntries is sent.
145         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
146                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
147
148         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
149
150         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
152         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
153         assertEquals("Entries size", 1, appendEntries.getEntries().size());
154         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
155         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
156         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
157     }
158
159
160     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
161         return sendReplicate(actorContext, 1, index);
162     }
163
164     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
165             final long index) {
166         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
167     }
168
169     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
170             final Payload payload) {
171         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
172         actorContext.getReplicatedLog().append(newEntry);
173         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
174     }
175
176     @Test
177     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
178         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179
180         MockRaftActorContext actorContext = createActorContextWithFollower();
181
182         long term = 1;
183         actorContext.getTermInformation().update(term, "");
184
185         leader = new Leader(actorContext);
186
187         // Leader will send an immediate heartbeat - ignore it.
188         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189
190         // The follower would normally reply - simulate that explicitly here.
191         long lastIndex = actorContext.getReplicatedLog().lastIndex();
192         leader.handleMessage(followerActor, new AppendEntriesReply(
193                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
194         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
195
196         followerActor.underlyingActor().clear();
197
198         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
199
200         // State should not change
201         assertTrue(raftBehavior instanceof Leader);
202
203         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
204         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
205         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
206         assertEquals("Entries size", 1, appendEntries.getEntries().size());
207         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
208         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
209         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
210         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
211     }
212
213     @Test
214     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
215         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
216
217         MockRaftActorContext actorContext = createActorContextWithFollower();
218         actorContext.setCommitIndex(-1);
219         actorContext.setLastApplied(-1);
220
221         // The raft context is initialized with a couple log entries. However the commitIndex
222         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
223         // committed and applied. Now it regains leadership with a higher term (2).
224         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
225         long newTerm = prevTerm + 1;
226         actorContext.getTermInformation().update(newTerm, "");
227
228         leader = new Leader(actorContext);
229         actorContext.setCurrentBehavior(leader);
230
231         // Leader will send an immediate heartbeat - ignore it.
232         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233
234         // The follower replies with the leader's current last index and term, simulating that it is
235         // up to date with the leader.
236         long lastIndex = actorContext.getReplicatedLog().lastIndex();
237         leader.handleMessage(followerActor, new AppendEntriesReply(
238                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
239
240         // The commit index should not get updated even though consensus was reached. This is b/c the
241         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
242         // from previous terms by counting replicas".
243         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
244
245         followerActor.underlyingActor().clear();
246
247         // Now replicate a new entry with the new term 2.
248         long newIndex = lastIndex + 1;
249         sendReplicate(actorContext, newTerm, newIndex);
250
251         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
252         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
253         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
254         assertEquals("Entries size", 1, appendEntries.getEntries().size());
255         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
256         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
257         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
258
259         // The follower replies with success. The leader should now update the commit index to the new index
260         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
261         // prior entries are committed indirectly".
262         leader.handleMessage(followerActor, new AppendEntriesReply(
263                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
264
265         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
266     }
267
268     @Test
269     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
270         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
271
272         MockRaftActorContext actorContext = createActorContextWithFollower();
273         actorContext.setRaftPolicy(createRaftPolicy(true, true));
274
275         long term = 1;
276         actorContext.getTermInformation().update(term, "");
277
278         leader = new Leader(actorContext);
279
280         // Leader will send an immediate heartbeat - ignore it.
281         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282
283         // The follower would normally reply - simulate that explicitly here.
284         long lastIndex = actorContext.getReplicatedLog().lastIndex();
285         leader.handleMessage(followerActor, new AppendEntriesReply(
286                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
287         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288
289         followerActor.underlyingActor().clear();
290
291         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
292
293         // State should not change
294         assertTrue(raftBehavior instanceof Leader);
295
296         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
297         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
298         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
299         assertEquals("Entries size", 1, appendEntries.getEntries().size());
300         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
301         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
302         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
303         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
304     }
305
306     @Test
307     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
308         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
309
310         MockRaftActorContext actorContext = createActorContextWithFollower();
311         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312             @Override
313             public FiniteDuration getHeartBeatInterval() {
314                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
315             }
316         });
317
318         long term = 1;
319         actorContext.getTermInformation().update(term, "");
320
321         leader = new Leader(actorContext);
322
323         // Leader will send an immediate heartbeat - ignore it.
324         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325
326         // The follower would normally reply - simulate that explicitly here.
327         long lastIndex = actorContext.getReplicatedLog().lastIndex();
328         leader.handleMessage(followerActor, new AppendEntriesReply(
329                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
330         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331
332         followerActor.underlyingActor().clear();
333
334         for (int i = 0; i < 5; i++) {
335             sendReplicate(actorContext, lastIndex + i + 1);
336         }
337
338         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
339         // We expect only 1 message to be sent because of two reasons,
340         // - an append entries reply was not received
341         // - the heartbeat interval has not expired
342         // In this scenario if multiple messages are sent they would likely be duplicates
343         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
344     }
345
346     @Test
347     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
348         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
349
350         MockRaftActorContext actorContext = createActorContextWithFollower();
351         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352             @Override
353             public FiniteDuration getHeartBeatInterval() {
354                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
355             }
356         });
357
358         long term = 1;
359         actorContext.getTermInformation().update(term, "");
360
361         leader = new Leader(actorContext);
362
363         // Leader will send an immediate heartbeat - ignore it.
364         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365
366         // The follower would normally reply - simulate that explicitly here.
367         long lastIndex = actorContext.getReplicatedLog().lastIndex();
368         leader.handleMessage(followerActor, new AppendEntriesReply(
369                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371
372         followerActor.underlyingActor().clear();
373
374         for (int i = 0; i < 3; i++) {
375             sendReplicate(actorContext, lastIndex + i + 1);
376             leader.handleMessage(followerActor, new AppendEntriesReply(
377                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
378         }
379
380         // We are expecting six messages here -- a request to replicate and a consensus-reached message
381         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
382         assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
383         for (int i = 0; i < 3; i++) {
384             assertRequestEntry(lastIndex, allMessages, i);
385             assertCommitEntry(lastIndex, allMessages, i);
386         }
387
388         // Now perform another commit, eliciting a request to persist
389         sendReplicate(actorContext, lastIndex + 3 + 1);
390         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
391         // This elicits another message for request to replicate
392         assertEquals("The number of request entries collected", 7, allMessages.size());
393         assertRequestEntry(lastIndex, allMessages, 3);
394
395         sendReplicate(actorContext, lastIndex + 4 + 1);
396         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
397         assertEquals("The number of request entries collected", 7, allMessages.size());
398     }
399
400     private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
401             final int messageNr) {
402         final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
403         assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
404         assertEquals(List.of(), commitReq.getEntries());
405     }
406
407     private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
408             final int messageNr) {
409         final AppendEntries req = allMessages.get(2 * messageNr);
410         assertEquals(lastIndex + messageNr, req.getLeaderCommit());
411
412         final List<ReplicatedLogEntry> entries = req.getEntries();
413         assertEquals(1, entries.size());
414         assertEquals(messageNr + 2, entries.get(0).getIndex());
415     }
416
417     @Test
418     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
419         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
420
421         MockRaftActorContext actorContext = createActorContextWithFollower();
422         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
423             @Override
424             public FiniteDuration getHeartBeatInterval() {
425                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
426             }
427         });
428
429         long term = 1;
430         actorContext.getTermInformation().update(term, "");
431
432         leader = new Leader(actorContext);
433
434         // Leader will send an immediate heartbeat - ignore it.
435         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
436
437         // The follower would normally reply - simulate that explicitly here.
438         long lastIndex = actorContext.getReplicatedLog().lastIndex();
439         leader.handleMessage(followerActor, new AppendEntriesReply(
440                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
441         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
442
443         followerActor.underlyingActor().clear();
444
445         sendReplicate(actorContext, lastIndex + 1);
446
447         // Wait slightly longer than heartbeat duration
448         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
449
450         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
451
452         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
453         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
454
455         assertEquals(1, allMessages.get(0).getEntries().size());
456         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
457         assertEquals(1, allMessages.get(1).getEntries().size());
458         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
459
460     }
461
462     @Test
463     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
464         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
465
466         MockRaftActorContext actorContext = createActorContextWithFollower();
467         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
468             @Override
469             public FiniteDuration getHeartBeatInterval() {
470                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
471             }
472         });
473
474         long term = 1;
475         actorContext.getTermInformation().update(term, "");
476
477         leader = new Leader(actorContext);
478
479         // Leader will send an immediate heartbeat - ignore it.
480         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
481
482         // The follower would normally reply - simulate that explicitly here.
483         long lastIndex = actorContext.getReplicatedLog().lastIndex();
484         leader.handleMessage(followerActor, new AppendEntriesReply(
485                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
486         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
487
488         followerActor.underlyingActor().clear();
489
490         for (int i = 0; i < 3; i++) {
491             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
492             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
493         }
494
495         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
496         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
497     }
498
499     @Test
500     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
501         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
502
503         MockRaftActorContext actorContext = createActorContextWithFollower();
504         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
505             @Override
506             public FiniteDuration getHeartBeatInterval() {
507                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
508             }
509         });
510
511         long term = 1;
512         actorContext.getTermInformation().update(term, "");
513
514         leader = new Leader(actorContext);
515
516         // Leader will send an immediate heartbeat - ignore it.
517         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
518
519         // The follower would normally reply - simulate that explicitly here.
520         long lastIndex = actorContext.getReplicatedLog().lastIndex();
521         leader.handleMessage(followerActor, new AppendEntriesReply(
522                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
523         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
524
525         followerActor.underlyingActor().clear();
526
527         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
528         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
529         sendReplicate(actorContext, lastIndex + 1);
530
531         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
532         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
533
534         assertEquals(0, allMessages.get(0).getEntries().size());
535         assertEquals(1, allMessages.get(1).getEntries().size());
536     }
537
538
539     @Test
540     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
541         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
542
543         MockRaftActorContext actorContext = createActorContext();
544
545         leader = new Leader(actorContext);
546
547         actorContext.setLastApplied(0);
548
549         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
550         long term = actorContext.getTermInformation().getCurrentTerm();
551         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
552                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
553
554         actorContext.getReplicatedLog().append(newEntry);
555
556         final Identifier id = new MockIdentifier("state-id");
557         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
558                 new Replicate(leaderActor, id, newEntry, true));
559
560         // State should not change
561         assertTrue(raftBehavior instanceof Leader);
562
563         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
564
565         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
566         // one since lastApplied state is 0.
567         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
568                 leaderActor, ApplyState.class);
569         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
570
571         for (int i = 0; i <= newLogIndex - 1; i++) {
572             ApplyState applyState = applyStateList.get(i);
573             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
574             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
575         }
576
577         ApplyState last = applyStateList.get((int) newLogIndex - 1);
578         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
579         assertEquals("getIdentifier", id, last.getIdentifier());
580     }
581
582     @Test
583     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
584         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
585
586         final MockRaftActorContext actorContext = createActorContextWithFollower();
587
588         //clears leaders log
589         actorContext.getReplicatedLog().removeFrom(0);
590
591         final int commitIndex = 3;
592         final int snapshotIndex = 2;
593         final int snapshotTerm = 1;
594
595         // set the snapshot variables in replicatedlog
596         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
597         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
598         actorContext.setCommitIndex(commitIndex);
599         //set follower timeout to 2 mins, helps during debugging
600         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
601
602         leader = new Leader(actorContext);
603
604         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
605         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
606
607         //update follower timestamp
608         leader.markFollowerActive(FOLLOWER_ID);
609
610         ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
611         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
612                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
613                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
614         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
615                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
616         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
617         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
618
619         //send first chunk and no InstallSnapshotReply received yet
620         fts.getNextChunk();
621         fts.incrementChunkIndex();
622
623         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
624                 TimeUnit.MILLISECONDS);
625
626         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
627
628         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
629
630         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
631
632         //InstallSnapshotReply received
633         fts.markSendStatus(true);
634
635         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
636
637         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
638
639         assertEquals(commitIndex, is.getLastIncludedIndex());
640     }
641
642     @Test
643     public void testSendAppendEntriesSnapshotScenario() {
644         logStart("testSendAppendEntriesSnapshotScenario");
645
646         final MockRaftActorContext actorContext = createActorContextWithFollower();
647
648         Map<String, String> leadersSnapshot = new HashMap<>();
649         leadersSnapshot.put("1", "A");
650         leadersSnapshot.put("2", "B");
651         leadersSnapshot.put("3", "C");
652
653         //clears leaders log
654         actorContext.getReplicatedLog().removeFrom(0);
655
656         final int followersLastIndex = 2;
657         final int snapshotIndex = 3;
658         final int newEntryIndex = 4;
659         final int snapshotTerm = 1;
660         final int currentTerm = 2;
661
662         // set the snapshot variables in replicatedlog
663         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
664         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
665         actorContext.setCommitIndex(followersLastIndex);
666
667         leader = new Leader(actorContext);
668
669         // Leader will send an immediate heartbeat - ignore it.
670         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
671
672         // new entry
673         SimpleReplicatedLogEntry entry =
674                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
675                         new MockRaftActorContext.MockPayload("D"));
676
677         actorContext.getReplicatedLog().append(entry);
678
679         //update follower timestamp
680         leader.markFollowerActive(FOLLOWER_ID);
681
682         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
683         RaftActorBehavior raftBehavior = leader.handleMessage(
684                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
685
686         assertTrue(raftBehavior instanceof Leader);
687
688         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
689     }
690
691     @Test
692     public void testInitiateInstallSnapshot() {
693         logStart("testInitiateInstallSnapshot");
694
695         MockRaftActorContext actorContext = createActorContextWithFollower();
696
697         //clears leaders log
698         actorContext.getReplicatedLog().removeFrom(0);
699
700         final int followersLastIndex = 2;
701         final int snapshotIndex = 3;
702         final int newEntryIndex = 4;
703         final int snapshotTerm = 1;
704         final int currentTerm = 2;
705
706         // set the snapshot variables in replicatedlog
707         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
708         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
709         actorContext.setLastApplied(3);
710         actorContext.setCommitIndex(followersLastIndex);
711
712         leader = new Leader(actorContext);
713
714         // Leader will send an immediate heartbeat - ignore it.
715         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
716
717         // set the snapshot as absent and check if capture-snapshot is invoked.
718         leader.setSnapshotHolder(null);
719
720         // new entry
721         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
722                 new MockRaftActorContext.MockPayload("D"));
723
724         actorContext.getReplicatedLog().append(entry);
725
726         //update follower timestamp
727         leader.markFollowerActive(FOLLOWER_ID);
728
729         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
730
731         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
732
733         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
734
735         assertEquals(3, cs.getLastAppliedIndex());
736         assertEquals(1, cs.getLastAppliedTerm());
737         assertEquals(4, cs.getLastIndex());
738         assertEquals(2, cs.getLastTerm());
739
740         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
741         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
742
743         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
744     }
745
746     @Test
747     public void testInitiateForceInstallSnapshot() throws Exception {
748         logStart("testInitiateForceInstallSnapshot");
749
750         MockRaftActorContext actorContext = createActorContextWithFollower();
751
752         final int followersLastIndex = 2;
753         final int snapshotIndex = -1;
754         final int newEntryIndex = 4;
755         final int snapshotTerm = -1;
756         final int currentTerm = 2;
757
758         // set the snapshot variables in replicatedlog
759         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
760         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
761         actorContext.setLastApplied(3);
762         actorContext.setCommitIndex(followersLastIndex);
763
764         actorContext.getReplicatedLog().removeFrom(0);
765
766         AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
767         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
768
769         leader = new Leader(actorContext);
770         actorContext.setCurrentBehavior(leader);
771
772         // Leader will send an immediate heartbeat - ignore it.
773         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
774
775         // set the snapshot as absent and check if capture-snapshot is invoked.
776         leader.setSnapshotHolder(null);
777
778         for (int i = 0; i < 4; i++) {
779             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
780                     new MockRaftActorContext.MockPayload("X" + i)));
781         }
782
783         // new entry
784         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
785                 new MockRaftActorContext.MockPayload("D"));
786
787         actorContext.getReplicatedLog().append(entry);
788
789         //update follower timestamp
790         leader.markFollowerActive(FOLLOWER_ID);
791
792         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
793         // installed with a SendInstallSnapshot
794         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
795                 RaftVersions.CURRENT_VERSION));
796
797         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
798
799         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
800         assertEquals(3, cs.getLastAppliedIndex());
801         assertEquals(1, cs.getLastAppliedTerm());
802         assertEquals(4, cs.getLastIndex());
803         assertEquals(2, cs.getLastTerm());
804
805         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
806         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
807
808         MessageCollectorActor.clearMessages(followerActor);
809
810         // Sending Replicate message should not initiate another capture since the first is in progress.
811         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
812         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
813
814         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
815         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
816                 RaftVersions.CURRENT_VERSION));
817         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
818
819         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
820         final byte[] bytes = new byte[]{1, 2, 3};
821         installSnapshotStream.get().orElseThrow().write(bytes);
822         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
823                 Runtime.getRuntime().totalMemory());
824         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825
826         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
827         MessageCollectorActor.clearMessages(followerActor);
828         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
829                 RaftVersions.CURRENT_VERSION));
830         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
831     }
832
833
834     @Test
835     public void testInstallSnapshot() {
836         logStart("testInstallSnapshot");
837
838         final MockRaftActorContext actorContext = createActorContextWithFollower();
839
840         Map<String, String> leadersSnapshot = new HashMap<>();
841         leadersSnapshot.put("1", "A");
842         leadersSnapshot.put("2", "B");
843         leadersSnapshot.put("3", "C");
844
845         //clears leaders log
846         actorContext.getReplicatedLog().removeFrom(0);
847
848         final int lastAppliedIndex = 3;
849         final int snapshotIndex = 2;
850         final int snapshotTerm = 1;
851         final int currentTerm = 2;
852
853         // set the snapshot variables in replicatedlog
854         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
855         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
856         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
857         actorContext.setCommitIndex(lastAppliedIndex);
858         actorContext.setLastApplied(lastAppliedIndex);
859
860         leader = new Leader(actorContext);
861
862         // Initial heartbeat.
863         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
864
865         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
866         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
867
868         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
869         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
870                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
871
872         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
873                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
874
875         assertTrue(raftBehavior instanceof Leader);
876
877         // check if installsnapshot gets called with the correct values.
878
879         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
880                 InstallSnapshot.class);
881
882         assertNotNull(installSnapshot.getData());
883         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
884         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
885
886         assertEquals(currentTerm, installSnapshot.getTerm());
887     }
888
889     @Test
890     public void testForceInstallSnapshot() {
891         logStart("testForceInstallSnapshot");
892
893         final MockRaftActorContext actorContext = createActorContextWithFollower();
894
895         Map<String, String> leadersSnapshot = new HashMap<>();
896         leadersSnapshot.put("1", "A");
897         leadersSnapshot.put("2", "B");
898         leadersSnapshot.put("3", "C");
899
900         final int lastAppliedIndex = 3;
901         final int snapshotIndex = -1;
902         final int snapshotTerm = -1;
903         final int currentTerm = 2;
904
905         // set the snapshot variables in replicatedlog
906         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
907         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
908         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
909         actorContext.setCommitIndex(lastAppliedIndex);
910         actorContext.setLastApplied(lastAppliedIndex);
911
912         leader = new Leader(actorContext);
913
914         // Initial heartbeat.
915         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
916
917         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
918         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
919
920         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
921         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
922                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
923
924         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
925                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
926
927         assertTrue(raftBehavior instanceof Leader);
928
929         // check if installsnapshot gets called with the correct values.
930
931         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
932                 InstallSnapshot.class);
933
934         assertNotNull(installSnapshot.getData());
935         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
936         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
937
938         assertEquals(currentTerm, installSnapshot.getTerm());
939     }
940
941     @Test
942     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
943         logStart("testHandleInstallSnapshotReplyLastChunk");
944
945         MockRaftActorContext actorContext = createActorContextWithFollower();
946
947         final int commitIndex = 3;
948         final int snapshotIndex = 2;
949         final int snapshotTerm = 1;
950         final int currentTerm = 2;
951
952         actorContext.setCommitIndex(commitIndex);
953
954         leader = new Leader(actorContext);
955         actorContext.setCurrentBehavior(leader);
956
957         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
958         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
959
960         // Ignore initial heartbeat.
961         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
962
963         Map<String, String> leadersSnapshot = new HashMap<>();
964         leadersSnapshot.put("1", "A");
965         leadersSnapshot.put("2", "B");
966         leadersSnapshot.put("3", "C");
967
968         // set the snapshot variables in replicatedlog
969
970         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
971         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
972         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
973
974         ByteString bs = toByteString(leadersSnapshot);
975         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
976                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
977                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
978         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
979                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
980         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
981         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
982         while (!fts.isLastChunk(fts.getChunkIndex())) {
983             fts.getNextChunk();
984             fts.incrementChunkIndex();
985         }
986
987         //clears leaders log
988         actorContext.getReplicatedLog().removeFrom(0);
989
990         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
991                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
992
993         assertTrue(raftBehavior instanceof Leader);
994
995         assertEquals(1, leader.followerLogSize());
996         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
997         assertNotNull(fli);
998         assertNull(fli.getInstallSnapshotState());
999         assertEquals(commitIndex, fli.getMatchIndex());
1000         assertEquals(commitIndex + 1, fli.getNextIndex());
1001         assertFalse(leader.hasSnapshot());
1002     }
1003
1004     @Test
1005     public void testSendSnapshotfromInstallSnapshotReply() {
1006         logStart("testSendSnapshotfromInstallSnapshotReply");
1007
1008         MockRaftActorContext actorContext = createActorContextWithFollower();
1009
1010         final int commitIndex = 3;
1011         final int snapshotIndex = 2;
1012         final int snapshotTerm = 1;
1013         final int currentTerm = 2;
1014
1015         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1016             @Override
1017             public int getSnapshotChunkSize() {
1018                 return 50;
1019             }
1020         };
1021         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1022         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1023
1024         actorContext.setConfigParams(configParams);
1025         actorContext.setCommitIndex(commitIndex);
1026
1027         leader = new Leader(actorContext);
1028         actorContext.setCurrentBehavior(leader);
1029
1030         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1031         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1032
1033         Map<String, String> leadersSnapshot = new HashMap<>();
1034         leadersSnapshot.put("1", "A");
1035         leadersSnapshot.put("2", "B");
1036         leadersSnapshot.put("3", "C");
1037
1038         // set the snapshot variables in replicatedlog
1039         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1040         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1041         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1042
1043         ByteString bs = toByteString(leadersSnapshot);
1044         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1045                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1046
1047         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1048
1049         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1050                 InstallSnapshot.class);
1051
1052         assertEquals(1, installSnapshot.getChunkIndex());
1053         assertEquals(3, installSnapshot.getTotalChunks());
1054
1055         followerActor.underlyingActor().clear();
1056         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1057                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1058
1059         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1060
1061         assertEquals(2, installSnapshot.getChunkIndex());
1062         assertEquals(3, installSnapshot.getTotalChunks());
1063
1064         followerActor.underlyingActor().clear();
1065         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1066                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1067
1068         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069
1070         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1071         followerActor.underlyingActor().clear();
1072         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1073                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1074
1075         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1076
1077         assertNull(installSnapshot);
1078     }
1079
1080
1081     @Test
1082     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1083         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1084
1085         MockRaftActorContext actorContext = createActorContextWithFollower();
1086
1087         final int commitIndex = 3;
1088         final int snapshotIndex = 2;
1089         final int snapshotTerm = 1;
1090         final int currentTerm = 2;
1091
1092         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1093             @Override
1094             public int getSnapshotChunkSize() {
1095                 return 50;
1096             }
1097         });
1098
1099         actorContext.setCommitIndex(commitIndex);
1100
1101         leader = new Leader(actorContext);
1102
1103         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1104         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1105
1106         Map<String, String> leadersSnapshot = new HashMap<>();
1107         leadersSnapshot.put("1", "A");
1108         leadersSnapshot.put("2", "B");
1109         leadersSnapshot.put("3", "C");
1110
1111         // set the snapshot variables in replicatedlog
1112         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1113         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1114         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1115
1116         ByteString bs = toByteString(leadersSnapshot);
1117         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1118                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1119
1120         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1121         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1122
1123         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1124                 InstallSnapshot.class);
1125
1126         assertEquals(1, installSnapshot.getChunkIndex());
1127         assertEquals(3, installSnapshot.getTotalChunks());
1128
1129         followerActor.underlyingActor().clear();
1130
1131         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1132                 FOLLOWER_ID, -1, false));
1133
1134         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1135                 TimeUnit.MILLISECONDS);
1136
1137         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1138
1139         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1140
1141         assertEquals(1, installSnapshot.getChunkIndex());
1142         assertEquals(3, installSnapshot.getTotalChunks());
1143     }
1144
1145     @Test
1146     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1147         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1148
1149         MockRaftActorContext actorContext = createActorContextWithFollower();
1150
1151         final int commitIndex = 3;
1152         final int snapshotIndex = 2;
1153         final int snapshotTerm = 1;
1154         final int currentTerm = 2;
1155
1156         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1157             @Override
1158             public int getSnapshotChunkSize() {
1159                 return 50;
1160             }
1161         });
1162
1163         actorContext.setCommitIndex(commitIndex);
1164
1165         leader = new Leader(actorContext);
1166
1167         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1168         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1169
1170         Map<String, String> leadersSnapshot = new HashMap<>();
1171         leadersSnapshot.put("1", "A");
1172         leadersSnapshot.put("2", "B");
1173         leadersSnapshot.put("3", "C");
1174
1175         // set the snapshot variables in replicatedlog
1176         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1177         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1178         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1179
1180         ByteString bs = toByteString(leadersSnapshot);
1181         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1182                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1183
1184         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1185
1186         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1187                 InstallSnapshot.class);
1188
1189         assertEquals(1, installSnapshot.getChunkIndex());
1190         assertEquals(3, installSnapshot.getTotalChunks());
1191         assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
1192                 installSnapshot.getLastChunkHashCode());
1193
1194         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1195
1196         followerActor.underlyingActor().clear();
1197
1198         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1199                 FOLLOWER_ID, 1, true));
1200
1201         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1202
1203         assertEquals(2, installSnapshot.getChunkIndex());
1204         assertEquals(3, installSnapshot.getTotalChunks());
1205         assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
1206     }
1207
1208     @Test
1209     public void testLeaderInstallSnapshotState() throws IOException {
1210         logStart("testLeaderInstallSnapshotState");
1211
1212         Map<String, String> leadersSnapshot = new HashMap<>();
1213         leadersSnapshot.put("1", "A");
1214         leadersSnapshot.put("2", "B");
1215         leadersSnapshot.put("3", "C");
1216
1217         ByteString bs = toByteString(leadersSnapshot);
1218         byte[] barray = bs.toByteArray();
1219
1220         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1221         fts.setSnapshotBytes(ByteSource.wrap(barray));
1222
1223         assertEquals(bs.size(), barray.length);
1224
1225         int chunkIndex = 0;
1226         for (int i = 0; i < barray.length; i = i + 50) {
1227             int length = i + 50;
1228             chunkIndex++;
1229
1230             if (i + 50 > barray.length) {
1231                 length = barray.length;
1232             }
1233
1234             byte[] chunk = fts.getNextChunk();
1235             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1236             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1237
1238             fts.markSendStatus(true);
1239             if (!fts.isLastChunk(chunkIndex)) {
1240                 fts.incrementChunkIndex();
1241             }
1242         }
1243
1244         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1245         fts.close();
1246     }
1247
1248     @Override
1249     protected Leader createBehavior(final RaftActorContext actorContext) {
1250         return new Leader(actorContext);
1251     }
1252
1253     @Override
1254     protected MockRaftActorContext createActorContext() {
1255         return createActorContext(leaderActor);
1256     }
1257
1258     @Override
1259     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1260         return createActorContext(LEADER_ID, actorRef);
1261     }
1262
1263     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1264         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1265         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1266         configParams.setElectionTimeoutFactor(100000);
1267         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1268         context.setConfigParams(configParams);
1269         context.setPayloadVersion(payloadVersion);
1270         return context;
1271     }
1272
1273     private MockRaftActorContext createActorContextWithFollower() {
1274         MockRaftActorContext actorContext = createActorContext();
1275         actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
1276         return actorContext;
1277     }
1278
1279     private MockRaftActorContext createFollowerActorContextWithLeader() {
1280         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1281         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1282         followerConfig.setElectionTimeoutFactor(10000);
1283         followerActorContext.setConfigParams(followerConfig);
1284         followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1285         return followerActorContext;
1286     }
1287
1288     @Test
1289     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1290         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1291
1292         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1293
1294         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1295
1296         Follower follower = new Follower(followerActorContext);
1297         followerActor.underlyingActor().setBehavior(follower);
1298         followerActorContext.setCurrentBehavior(follower);
1299
1300         Map<String, String> peerAddresses = new HashMap<>();
1301         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1302
1303         leaderActorContext.setPeerAddresses(peerAddresses);
1304
1305         leaderActorContext.getReplicatedLog().removeFrom(0);
1306
1307         //create 3 entries
1308         leaderActorContext.setReplicatedLog(
1309                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1310
1311         leaderActorContext.setCommitIndex(1);
1312
1313         followerActorContext.getReplicatedLog().removeFrom(0);
1314
1315         // follower too has the exact same log entries and has the same commit index
1316         followerActorContext.setReplicatedLog(
1317                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1318
1319         followerActorContext.setCommitIndex(1);
1320
1321         leader = new Leader(leaderActorContext);
1322         leaderActorContext.setCurrentBehavior(leader);
1323
1324         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1325
1326         assertEquals(-1, appendEntries.getLeaderCommit());
1327         assertEquals(0, appendEntries.getEntries().size());
1328         assertEquals(0, appendEntries.getPrevLogIndex());
1329
1330         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1331                 leaderActor, AppendEntriesReply.class);
1332
1333         assertEquals(2, appendEntriesReply.getLogLastIndex());
1334         assertEquals(1, appendEntriesReply.getLogLastTerm());
1335
1336         // follower returns its next index
1337         assertEquals(2, appendEntriesReply.getLogLastIndex());
1338         assertEquals(1, appendEntriesReply.getLogLastTerm());
1339
1340         follower.close();
1341     }
1342
1343     @Test
1344     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1345         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1346
1347         final MockRaftActorContext leaderActorContext = createActorContext();
1348
1349         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1350         followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1351
1352         Follower follower = new Follower(followerActorContext);
1353         followerActor.underlyingActor().setBehavior(follower);
1354         followerActorContext.setCurrentBehavior(follower);
1355
1356         Map<String, String> leaderPeerAddresses = new HashMap<>();
1357         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1358
1359         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1360
1361         leaderActorContext.getReplicatedLog().removeFrom(0);
1362
1363         leaderActorContext.setReplicatedLog(
1364                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1365
1366         leaderActorContext.setCommitIndex(1);
1367
1368         followerActorContext.getReplicatedLog().removeFrom(0);
1369
1370         followerActorContext.setReplicatedLog(
1371                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1372
1373         // follower has the same log entries but its commit index > leaders commit index
1374         followerActorContext.setCommitIndex(2);
1375
1376         leader = new Leader(leaderActorContext);
1377
1378         // Initial heartbeat
1379         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1380
1381         assertEquals(-1, appendEntries.getLeaderCommit());
1382         assertEquals(0, appendEntries.getEntries().size());
1383         assertEquals(0, appendEntries.getPrevLogIndex());
1384
1385         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1386                 leaderActor, AppendEntriesReply.class);
1387
1388         assertEquals(2, appendEntriesReply.getLogLastIndex());
1389         assertEquals(1, appendEntriesReply.getLogLastTerm());
1390
1391         leaderActor.underlyingActor().setBehavior(follower);
1392         leader.handleMessage(followerActor, appendEntriesReply);
1393
1394         leaderActor.underlyingActor().clear();
1395         followerActor.underlyingActor().clear();
1396
1397         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1398                 TimeUnit.MILLISECONDS);
1399
1400         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1401
1402         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1403
1404         assertEquals(2, appendEntries.getLeaderCommit());
1405         assertEquals(0, appendEntries.getEntries().size());
1406         assertEquals(2, appendEntries.getPrevLogIndex());
1407
1408         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1409
1410         assertEquals(2, appendEntriesReply.getLogLastIndex());
1411         assertEquals(1, appendEntriesReply.getLogLastTerm());
1412
1413         assertEquals(2, followerActorContext.getCommitIndex());
1414
1415         follower.close();
1416     }
1417
1418     @Test
1419     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1420         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1421
1422         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1423         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1424                 new FiniteDuration(1000, TimeUnit.SECONDS));
1425
1426         leaderActorContext.setReplicatedLog(
1427                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1428         long leaderCommitIndex = 2;
1429         leaderActorContext.setCommitIndex(leaderCommitIndex);
1430         leaderActorContext.setLastApplied(leaderCommitIndex);
1431
1432         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1433         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1434
1435         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1436
1437         followerActorContext.setReplicatedLog(
1438                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1439         followerActorContext.setCommitIndex(0);
1440         followerActorContext.setLastApplied(0);
1441
1442         Follower follower = new Follower(followerActorContext);
1443         followerActor.underlyingActor().setBehavior(follower);
1444
1445         leader = new Leader(leaderActorContext);
1446
1447         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1448         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1449                 AppendEntriesReply.class);
1450
1451         MessageCollectorActor.clearMessages(followerActor);
1452         MessageCollectorActor.clearMessages(leaderActor);
1453
1454         // Verify initial AppendEntries sent.
1455         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1456         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1457         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1458
1459         leaderActor.underlyingActor().setBehavior(leader);
1460
1461         leader.handleMessage(followerActor, appendEntriesReply);
1462
1463         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1464         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1465
1466         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1467         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1468         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1469
1470         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1471         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1472                 appendEntries.getEntries().get(0).getData());
1473         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1474         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1475                 appendEntries.getEntries().get(1).getData());
1476
1477         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1478         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1479
1480         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1481
1482         ApplyState applyState = applyStateList.get(0);
1483         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1484         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1485         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1486                 applyState.getReplicatedLogEntry().getData());
1487
1488         applyState = applyStateList.get(1);
1489         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1490         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1491         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1492                 applyState.getReplicatedLogEntry().getData());
1493
1494         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1495         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1496     }
1497
1498     @Test
1499     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1500         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1501
1502         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1503         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1504                 new FiniteDuration(1000, TimeUnit.SECONDS));
1505
1506         leaderActorContext.setReplicatedLog(
1507                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1508         long leaderCommitIndex = 1;
1509         leaderActorContext.setCommitIndex(leaderCommitIndex);
1510         leaderActorContext.setLastApplied(leaderCommitIndex);
1511
1512         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1513         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1514
1515         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1516
1517         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1518         followerActorContext.setCommitIndex(-1);
1519         followerActorContext.setLastApplied(-1);
1520
1521         Follower follower = new Follower(followerActorContext);
1522         followerActor.underlyingActor().setBehavior(follower);
1523         followerActorContext.setCurrentBehavior(follower);
1524
1525         leader = new Leader(leaderActorContext);
1526
1527         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1528         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1529                 AppendEntriesReply.class);
1530
1531         MessageCollectorActor.clearMessages(followerActor);
1532         MessageCollectorActor.clearMessages(leaderActor);
1533
1534         // Verify initial AppendEntries sent with the leader's current commit index.
1535         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1536         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1537         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1538
1539         leaderActor.underlyingActor().setBehavior(leader);
1540         leaderActorContext.setCurrentBehavior(leader);
1541
1542         leader.handleMessage(followerActor, appendEntriesReply);
1543
1544         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1545         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1546
1547         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1548         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1549         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1550
1551         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1552         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1553                 appendEntries.getEntries().get(0).getData());
1554         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1555         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1556                 appendEntries.getEntries().get(1).getData());
1557
1558         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1559         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1560
1561         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1562
1563         ApplyState applyState = applyStateList.get(0);
1564         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1565         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1566         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1567                 applyState.getReplicatedLogEntry().getData());
1568
1569         applyState = applyStateList.get(1);
1570         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1571         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1572         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1573                 applyState.getReplicatedLogEntry().getData());
1574
1575         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1576         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1577     }
1578
1579     @Test
1580     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1581         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1582
1583         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1584         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1585                 new FiniteDuration(1000, TimeUnit.SECONDS));
1586
1587         leaderActorContext.setReplicatedLog(
1588                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1589         long leaderCommitIndex = 1;
1590         leaderActorContext.setCommitIndex(leaderCommitIndex);
1591         leaderActorContext.setLastApplied(leaderCommitIndex);
1592
1593         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1594         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1595
1596         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1597
1598         followerActorContext.setReplicatedLog(
1599                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1600         followerActorContext.setCommitIndex(-1);
1601         followerActorContext.setLastApplied(-1);
1602
1603         Follower follower = new Follower(followerActorContext);
1604         followerActor.underlyingActor().setBehavior(follower);
1605         followerActorContext.setCurrentBehavior(follower);
1606
1607         leader = new Leader(leaderActorContext);
1608
1609         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1610         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1611                 AppendEntriesReply.class);
1612
1613         MessageCollectorActor.clearMessages(followerActor);
1614         MessageCollectorActor.clearMessages(leaderActor);
1615
1616         // Verify initial AppendEntries sent with the leader's current commit index.
1617         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1618         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1619         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1620
1621         leaderActor.underlyingActor().setBehavior(leader);
1622         leaderActorContext.setCurrentBehavior(leader);
1623
1624         leader.handleMessage(followerActor, appendEntriesReply);
1625
1626         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1627         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1628
1629         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1630         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1631         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1632
1633         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1634         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1635         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1636                 appendEntries.getEntries().get(0).getData());
1637         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1638         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1639         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1640                 appendEntries.getEntries().get(1).getData());
1641
1642         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1643         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1644
1645         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1646
1647         ApplyState applyState = applyStateList.get(0);
1648         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1649         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1650         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1651                 applyState.getReplicatedLogEntry().getData());
1652
1653         applyState = applyStateList.get(1);
1654         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1655         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1656         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1657                 applyState.getReplicatedLogEntry().getData());
1658
1659         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1660         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1661         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1662     }
1663
1664     @Test
1665     public void testHandleAppendEntriesReplyWithNewerTerm() {
1666         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1667
1668         MockRaftActorContext leaderActorContext = createActorContext();
1669         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1670                 new FiniteDuration(10000, TimeUnit.SECONDS));
1671
1672         leaderActorContext.setReplicatedLog(
1673                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1674
1675         leader = new Leader(leaderActorContext);
1676         leaderActor.underlyingActor().setBehavior(leader);
1677         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1678
1679         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1680                 AppendEntriesReply.class);
1681
1682         assertEquals(false, appendEntriesReply.isSuccess());
1683         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1684
1685         MessageCollectorActor.clearMessages(leaderActor);
1686     }
1687
1688     @Test
1689     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1690         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1691
1692         MockRaftActorContext leaderActorContext = createActorContext();
1693         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1694                 new FiniteDuration(10000, TimeUnit.SECONDS));
1695
1696         leaderActorContext.setReplicatedLog(
1697                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1698         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1699
1700         leader = new Leader(leaderActorContext);
1701         leaderActor.underlyingActor().setBehavior(leader);
1702         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1703
1704         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1705                 AppendEntriesReply.class);
1706
1707         assertEquals(false, appendEntriesReply.isSuccess());
1708         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1709
1710         MessageCollectorActor.clearMessages(leaderActor);
1711     }
1712
1713     @Test
1714     public void testHandleAppendEntriesReplySuccess() {
1715         logStart("testHandleAppendEntriesReplySuccess");
1716
1717         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1718
1719         leaderActorContext.setReplicatedLog(
1720                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1721
1722         leaderActorContext.setCommitIndex(1);
1723         leaderActorContext.setLastApplied(1);
1724         leaderActorContext.getTermInformation().update(1, "leader");
1725
1726         leader = new Leader(leaderActorContext);
1727
1728         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1729
1730         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1731         assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
1732
1733         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1734
1735         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1736
1737         assertEquals(RaftState.Leader, raftActorBehavior.state());
1738
1739         assertEquals(2, leaderActorContext.getCommitIndex());
1740
1741         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1742                 leaderActor, ApplyJournalEntries.class);
1743
1744         assertEquals(2, leaderActorContext.getLastApplied());
1745
1746         assertEquals(2, applyJournalEntries.getToIndex());
1747
1748         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1749                 ApplyState.class);
1750
1751         assertEquals(1,applyStateList.size());
1752
1753         ApplyState applyState = applyStateList.get(0);
1754
1755         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1756
1757         assertEquals(2, followerInfo.getMatchIndex());
1758         assertEquals(3, followerInfo.getNextIndex());
1759         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1760         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1761     }
1762
1763     @Test
1764     public void testHandleAppendEntriesReplyUnknownFollower() {
1765         logStart("testHandleAppendEntriesReplyUnknownFollower");
1766
1767         MockRaftActorContext leaderActorContext = createActorContext();
1768
1769         leader = new Leader(leaderActorContext);
1770
1771         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1772
1773         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1774
1775         assertEquals(RaftState.Leader, raftActorBehavior.state());
1776     }
1777
1778     @Test
1779     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1780         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1781
1782         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1783         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1784                 new FiniteDuration(1000, TimeUnit.SECONDS));
1785         // Note: the size here depends on estimate
1786         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1787
1788         leaderActorContext.setReplicatedLog(
1789                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1790         long leaderCommitIndex = 3;
1791         leaderActorContext.setCommitIndex(leaderCommitIndex);
1792         leaderActorContext.setLastApplied(leaderCommitIndex);
1793
1794         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1795         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1796         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1797         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1798
1799         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1800
1801         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1802         followerActorContext.setCommitIndex(-1);
1803         followerActorContext.setLastApplied(-1);
1804
1805         Follower follower = new Follower(followerActorContext);
1806         followerActor.underlyingActor().setBehavior(follower);
1807         followerActorContext.setCurrentBehavior(follower);
1808
1809         leader = new Leader(leaderActorContext);
1810
1811         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1812         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1813                 AppendEntriesReply.class);
1814
1815         MessageCollectorActor.clearMessages(followerActor);
1816         MessageCollectorActor.clearMessages(leaderActor);
1817
1818         // Verify initial AppendEntries sent with the leader's current commit index.
1819         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1820         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1821         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1822
1823         leaderActor.underlyingActor().setBehavior(leader);
1824         leaderActorContext.setCurrentBehavior(leader);
1825
1826         leader.handleMessage(followerActor, appendEntriesReply);
1827
1828         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1829                 AppendEntries.class, 2);
1830         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1831
1832         appendEntries = appendEntriesList.get(0);
1833         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1834         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1835         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1836
1837         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1838         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1839                 appendEntries.getEntries().get(0).getData());
1840         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1841         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1842                 appendEntries.getEntries().get(1).getData());
1843
1844         appendEntries = appendEntriesList.get(1);
1845         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1846         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1847         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1848
1849         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1850         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1851                 appendEntries.getEntries().get(0).getData());
1852         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1853         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1854                 appendEntries.getEntries().get(1).getData());
1855
1856         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1857         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1858
1859         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1860
1861         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1862         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1863     }
1864
1865     @Test
1866     public void testHandleRequestVoteReply() {
1867         logStart("testHandleRequestVoteReply");
1868
1869         MockRaftActorContext leaderActorContext = createActorContext();
1870
1871         leader = new Leader(leaderActorContext);
1872
1873         // Should be a no-op.
1874         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1875                 new RequestVoteReply(1, true));
1876
1877         assertEquals(RaftState.Leader, raftActorBehavior.state());
1878
1879         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1880
1881         assertEquals(RaftState.Leader, raftActorBehavior.state());
1882     }
1883
1884     @Test
1885     public void testIsolatedLeaderCheckNoFollowers() {
1886         logStart("testIsolatedLeaderCheckNoFollowers");
1887
1888         MockRaftActorContext leaderActorContext = createActorContext();
1889
1890         leader = new Leader(leaderActorContext);
1891         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1892         assertTrue(newBehavior instanceof Leader);
1893     }
1894
1895     @Test
1896     public void testIsolatedLeaderCheckNoVotingFollowers() {
1897         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1898
1899         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1900         Follower follower = new Follower(followerActorContext);
1901         followerActor.underlyingActor().setBehavior(follower);
1902
1903         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1904         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1905                 new FiniteDuration(1000, TimeUnit.SECONDS));
1906         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1907
1908         leader = new Leader(leaderActorContext);
1909         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1910         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1911         assertTrue("Expected Leader", newBehavior instanceof Leader);
1912     }
1913
1914     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1915         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1916         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1917
1918         MockRaftActorContext leaderActorContext = createActorContext();
1919
1920         Map<String, String> peerAddresses = new HashMap<>();
1921         peerAddresses.put("follower-1", followerActor1.path().toString());
1922         peerAddresses.put("follower-2", followerActor2.path().toString());
1923
1924         leaderActorContext.setPeerAddresses(peerAddresses);
1925         leaderActorContext.setRaftPolicy(raftPolicy);
1926
1927         leader = new Leader(leaderActorContext);
1928
1929         leader.markFollowerActive("follower-1");
1930         leader.markFollowerActive("follower-2");
1931         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1932         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1933
1934         // kill 1 follower and verify if that got killed
1935         final TestKit probe = new TestKit(getSystem());
1936         probe.watch(followerActor1);
1937         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1938         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1939         assertEquals(termMsg1.getActor(), followerActor1);
1940
1941         leader.markFollowerInActive("follower-1");
1942         leader.markFollowerActive("follower-2");
1943         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1944         assertTrue("Behavior not instance of Leader when majority of followers are active",
1945                 newBehavior instanceof Leader);
1946
1947         // kill 2nd follower and leader should change to Isolated leader
1948         followerActor2.tell(PoisonPill.getInstance(), null);
1949         probe.watch(followerActor2);
1950         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1951         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1952         assertEquals(termMsg2.getActor(), followerActor2);
1953
1954         leader.markFollowerInActive("follower-2");
1955         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1956     }
1957
1958     @Test
1959     public void testIsolatedLeaderCheckTwoFollowers() {
1960         logStart("testIsolatedLeaderCheckTwoFollowers");
1961
1962         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1963
1964         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1965             newBehavior instanceof IsolatedLeader);
1966     }
1967
1968     @Test
1969     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1970         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1971
1972         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1973
1974         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1975                 newBehavior instanceof Leader);
1976     }
1977
1978     @Test
1979     public void testLaggingFollowerStarvation() {
1980         logStart("testLaggingFollowerStarvation");
1981
1982         String leaderActorId = actorFactory.generateActorId("leader");
1983         String follower1ActorId = actorFactory.generateActorId("follower");
1984         String follower2ActorId = actorFactory.generateActorId("follower");
1985
1986         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1987         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1988
1989         MockRaftActorContext leaderActorContext =
1990                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1991
1992         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1993         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1994         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1995
1996         leaderActorContext.setConfigParams(configParams);
1997
1998         leaderActorContext.setReplicatedLog(
1999                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2000
2001         Map<String, String> peerAddresses = new HashMap<>();
2002         peerAddresses.put(follower1ActorId,
2003                 follower1Actor.path().toString());
2004         peerAddresses.put(follower2ActorId,
2005                 follower2Actor.path().toString());
2006
2007         leaderActorContext.setPeerAddresses(peerAddresses);
2008         leaderActorContext.getTermInformation().update(1, leaderActorId);
2009
2010         leader = createBehavior(leaderActorContext);
2011
2012         leaderActor.underlyingActor().setBehavior(leader);
2013
2014         for (int i = 1; i < 6; i++) {
2015             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2016             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2017                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2018             assertTrue(newBehavior == leader);
2019             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2020         }
2021
2022         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2023         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2024
2025         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2026                 heartbeats.size() > 1);
2027
2028         // Check if follower-2 got AppendEntries during this time and was not starved
2029         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2030
2031         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2032                 appendEntries.size() > 1);
2033     }
2034
2035     @Test
2036     public void testReplicationConsensusWithNonVotingFollower() {
2037         logStart("testReplicationConsensusWithNonVotingFollower");
2038
2039         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2040         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2041                 new FiniteDuration(1000, TimeUnit.SECONDS));
2042
2043         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2044         leaderActorContext.setCommitIndex(-1);
2045         leaderActorContext.setLastApplied(-1);
2046
2047         String nonVotingFollowerId = "nonvoting-follower";
2048         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2049                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2050
2051         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2052                 VotingState.NON_VOTING);
2053
2054         leader = new Leader(leaderActorContext);
2055         leaderActorContext.setCurrentBehavior(leader);
2056
2057         // Ignore initial heartbeats
2058         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2059         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2060
2061         MessageCollectorActor.clearMessages(followerActor);
2062         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2063         MessageCollectorActor.clearMessages(leaderActor);
2064
2065         // Send a Replicate message and wait for AppendEntries.
2066         sendReplicate(leaderActorContext, 0);
2067
2068         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2069         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2070
2071         // Send reply only from the voting follower and verify consensus via ApplyState.
2072         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2073
2074         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2075
2076         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2077
2078         MessageCollectorActor.clearMessages(followerActor);
2079         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2080         MessageCollectorActor.clearMessages(leaderActor);
2081
2082         // Send another Replicate message
2083         sendReplicate(leaderActorContext, 1);
2084
2085         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2086         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2087                 AppendEntries.class);
2088         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2089         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2090
2091         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2092         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2093
2094         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2095
2096         // Send reply from the voting follower and verify consensus.
2097         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2098
2099         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2100     }
2101
2102     @Test
2103     public void testTransferLeadershipWithFollowerInSync() {
2104         logStart("testTransferLeadershipWithFollowerInSync");
2105
2106         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2107         leaderActorContext.setLastApplied(-1);
2108         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2109                 new FiniteDuration(1000, TimeUnit.SECONDS));
2110         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2111
2112         leader = new Leader(leaderActorContext);
2113         leaderActorContext.setCurrentBehavior(leader);
2114
2115         // Initial heartbeat
2116         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2117         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2118         MessageCollectorActor.clearMessages(followerActor);
2119
2120         sendReplicate(leaderActorContext, 0);
2121         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2122
2123         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2124         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2125         MessageCollectorActor.clearMessages(followerActor);
2126
2127         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2128         leader.transferLeadership(mockTransferCohort);
2129
2130         verify(mockTransferCohort, never()).transferComplete();
2131         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2132         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2134
2135         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2136         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2137
2138         // Leader should force an election timeout
2139         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2140
2141         verify(mockTransferCohort).transferComplete();
2142     }
2143
2144     @Test
2145     public void testTransferLeadershipWithEmptyLog() {
2146         logStart("testTransferLeadershipWithEmptyLog");
2147
2148         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2149         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2150                 new FiniteDuration(1000, TimeUnit.SECONDS));
2151         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2152
2153         leader = new Leader(leaderActorContext);
2154         leaderActorContext.setCurrentBehavior(leader);
2155
2156         // Initial heartbeat
2157         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2158         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2159         MessageCollectorActor.clearMessages(followerActor);
2160
2161         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2162         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2163         leader.transferLeadership(mockTransferCohort);
2164
2165         verify(mockTransferCohort, never()).transferComplete();
2166         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2167         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2168
2169         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2170         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2171
2172         // Leader should force an election timeout
2173         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2174
2175         verify(mockTransferCohort).transferComplete();
2176     }
2177
2178     @Test
2179     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2180         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2181
2182         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2183         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2184                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2185
2186         leader = new Leader(leaderActorContext);
2187         leaderActorContext.setCurrentBehavior(leader);
2188
2189         // Initial heartbeat
2190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2191         MessageCollectorActor.clearMessages(followerActor);
2192
2193         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2194         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2195         leader.transferLeadership(mockTransferCohort);
2196
2197         verify(mockTransferCohort, never()).transferComplete();
2198
2199         // Sync up the follower.
2200         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2201         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2202         MessageCollectorActor.clearMessages(followerActor);
2203
2204         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2205                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2206         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2207         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2208         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2209
2210         // Leader should force an election timeout
2211         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2212
2213         verify(mockTransferCohort).transferComplete();
2214     }
2215
2216     @Test
2217     public void testTransferLeadershipWithFollowerSyncTimeout() {
2218         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2219
2220         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2221         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2222                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2223         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2224         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2225
2226         leader = new Leader(leaderActorContext);
2227         leaderActorContext.setCurrentBehavior(leader);
2228
2229         // Initial heartbeat
2230         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2231         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2232         MessageCollectorActor.clearMessages(followerActor);
2233
2234         sendReplicate(leaderActorContext, 0);
2235         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2236
2237         MessageCollectorActor.clearMessages(followerActor);
2238
2239         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2240         leader.transferLeadership(mockTransferCohort);
2241
2242         verify(mockTransferCohort, never()).transferComplete();
2243
2244         // Send heartbeats to time out the transfer.
2245         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2246             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2247                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2248             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2249         }
2250
2251         verify(mockTransferCohort).abortTransfer();
2252         verify(mockTransferCohort, never()).transferComplete();
2253         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2254     }
2255
2256     @Test
2257     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2258         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2259
2260         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2261                 List.of(new SimpleReplicatedLogEntry(0, 1,
2262                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2263         final MockRaftActorContext.MockPayload largePayload =
2264                 new MockRaftActorContext.MockPayload("large", serializedSize);
2265
2266         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2267         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2268                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2269         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2270         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2271         leaderActorContext.setCommitIndex(-1);
2272         leaderActorContext.setLastApplied(-1);
2273
2274         leader = new Leader(leaderActorContext);
2275         leaderActorContext.setCurrentBehavior(leader);
2276
2277         // Send initial heartbeat reply so follower is marked active
2278         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2279         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2280         MessageCollectorActor.clearMessages(followerActor);
2281
2282         // Send normal payload first to prime commit index.
2283         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2284         sendReplicate(leaderActorContext, term, 0);
2285
2286         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2287         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2288         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2289
2290         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2291         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2292         MessageCollectorActor.clearMessages(followerActor);
2293
2294         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2295         sendReplicate(leaderActorContext, term, 1, largePayload);
2296
2297         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2298         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2299         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2300
2301         final Identifier slicingId = messageSlice.getIdentifier();
2302
2303         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2304         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2305         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2306         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2307         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2308         MessageCollectorActor.clearMessages(followerActor);
2309
2310         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2311
2312         // Sleep for the heartbeat interval so AppendEntries is sent.
2313         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2314                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2315
2316         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2317
2318         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2319         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2320         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2321         MessageCollectorActor.clearMessages(followerActor);
2322
2323         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2324
2325         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2326         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2327         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2328
2329         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2330
2331         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2332
2333         MessageCollectorActor.clearMessages(followerActor);
2334
2335         // Send another normal payload.
2336
2337         sendReplicate(leaderActorContext, term, 2);
2338
2339         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2340         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2341         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2342         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2343     }
2344
2345     @Test
2346     public void testLargePayloadSlicingExpiration() {
2347         logStart("testLargePayloadSlicingExpiration");
2348
2349         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2350         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2351                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2352         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2353         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2354         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2355         leaderActorContext.setCommitIndex(-1);
2356         leaderActorContext.setLastApplied(-1);
2357
2358         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2359         leader = new Leader(leaderActorContext);
2360         leaderActorContext.setCurrentBehavior(leader);
2361
2362         // Send initial heartbeat reply so follower is marked active
2363         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2364         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2365         MessageCollectorActor.clearMessages(followerActor);
2366
2367         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2368                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2369         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2370
2371         // Sleep for at least 3 * election timeout so the slicing state expires.
2372         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2373                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2374         MessageCollectorActor.clearMessages(followerActor);
2375
2376         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2377
2378         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2379         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2380         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2381
2382         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2383         MessageCollectorActor.clearMessages(followerActor);
2384
2385         // Send an AppendEntriesReply - this should restart the slicing.
2386
2387         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2388                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2389
2390         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2391
2392         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2393     }
2394
2395     @Test
2396     public void testLeaderAddressInAppendEntries() {
2397         logStart("testLeaderAddressInAppendEntries");
2398
2399         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2400         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2401                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2402         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2403         leaderActorContext.setCommitIndex(-1);
2404         leaderActorContext.setLastApplied(-1);
2405
2406         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2407             peerId -> leaderActor.path().toString());
2408
2409         leader = new Leader(leaderActorContext);
2410         leaderActorContext.setCurrentBehavior(leader);
2411
2412         // Initial heartbeat shouldn't have the leader address
2413
2414         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2415         assertFalse(appendEntries.getLeaderAddress().isPresent());
2416         MessageCollectorActor.clearMessages(followerActor);
2417
2418         // Send AppendEntriesReply indicating the follower needs the leader address
2419
2420         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2421                 RaftVersions.CURRENT_VERSION));
2422
2423         // Sleep for the heartbeat interval so AppendEntries is sent.
2424         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2425                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2426
2427         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2428
2429         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2430         assertTrue(appendEntries.getLeaderAddress().isPresent());
2431         assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().orElseThrow());
2432         MessageCollectorActor.clearMessages(followerActor);
2433
2434         // Send AppendEntriesReply indicating the follower does not need the leader address
2435
2436         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2437                 RaftVersions.CURRENT_VERSION));
2438
2439         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2440                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2441
2442         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2443
2444         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2445         assertFalse(appendEntries.getLeaderAddress().isPresent());
2446     }
2447
2448     @Override
2449     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2450             final ActorRef actorRef, final RaftRPC rpc) {
2451         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2452         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2453     }
2454
2455     private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2456
2457         private final long electionTimeOutIntervalMillis;
2458         private final int snapshotChunkSize;
2459
2460         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2461             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2462             this.snapshotChunkSize = snapshotChunkSize;
2463         }
2464
2465         @Override
2466         public FiniteDuration getElectionTimeOutInterval() {
2467             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2468         }
2469
2470         @Override
2471         public int getSnapshotChunkSize() {
2472             return snapshotChunkSize;
2473         }
2474     }
2475 }