Modernize sal-akka-raft tests
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.io.ByteSource;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.apache.commons.lang3.SerializationUtils;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.messaging.MessageSlice;
43 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
46 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
47 import org.opendaylight.controller.cluster.raft.RaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
49 import org.opendaylight.controller.cluster.raft.RaftState;
50 import org.opendaylight.controller.cluster.raft.RaftVersions;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.VotingState;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
57 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
58 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
60 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.messages.Payload;
66 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
68 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
69 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
70 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
72 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
74 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import org.opendaylight.yangtools.concepts.Identifier;
77 import scala.concurrent.duration.FiniteDuration;
78
79 public class LeaderTest extends AbstractLeaderTest<Leader> {
80
81     static final String FOLLOWER_ID = "follower";
82     public static final String LEADER_ID = "leader";
83
84     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
85             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
86
87     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
88             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
89
90     private Leader leader;
91     private final short payloadVersion = 5;
92
93     @Override
94     @After
95     public void tearDown() {
96         if (leader != null) {
97             leader.close();
98         }
99
100         super.tearDown();
101     }
102
103     @Test
104     public void testHandleMessageForUnknownMessage() {
105         logStart("testHandleMessageForUnknownMessage");
106
107         leader = new Leader(createActorContext());
108
109         // handle message should null when it receives an unknown message
110         assertNull(leader.handleMessage(followerActor, "foo"));
111     }
112
113     @Test
114     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
115         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
116
117         MockRaftActorContext actorContext = createActorContextWithFollower();
118         actorContext.setCommitIndex(-1);
119         actorContext.setPayloadVersion(payloadVersion);
120
121         long term = 1;
122         actorContext.getTermInformation().update(term, "");
123
124         leader = new Leader(actorContext);
125         actorContext.setCurrentBehavior(leader);
126
127         // Leader should send an immediate heartbeat with no entries as follower is inactive.
128         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
129         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
130         assertEquals("getTerm", term, appendEntries.getTerm());
131         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
132         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
133         assertEquals("Entries size", 0, appendEntries.getEntries().size());
134         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
135
136         // The follower would normally reply - simulate that explicitly here.
137         leader.handleMessage(followerActor, new AppendEntriesReply(
138                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
139         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
140
141         followerActor.underlyingActor().clear();
142
143         // Sleep for the heartbeat interval so AppendEntries is sent.
144         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
145                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
146
147         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
148
149         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
150         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
151         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
152         assertEquals("Entries size", 1, appendEntries.getEntries().size());
153         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
154         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
155         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
156     }
157
158
159     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
160         return sendReplicate(actorContext, 1, index);
161     }
162
163     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
164             final long index) {
165         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
166     }
167
168     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
169             final Payload payload) {
170         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
171         actorContext.getReplicatedLog().append(newEntry);
172         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
173     }
174
175     @Test
176     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
177         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
178
179         MockRaftActorContext actorContext = createActorContextWithFollower();
180
181         long term = 1;
182         actorContext.getTermInformation().update(term, "");
183
184         leader = new Leader(actorContext);
185
186         // Leader will send an immediate heartbeat - ignore it.
187         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
188
189         // The follower would normally reply - simulate that explicitly here.
190         long lastIndex = actorContext.getReplicatedLog().lastIndex();
191         leader.handleMessage(followerActor, new AppendEntriesReply(
192                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
193         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
194
195         followerActor.underlyingActor().clear();
196
197         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
198
199         // State should not change
200         assertTrue(raftBehavior instanceof Leader);
201
202         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
203         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
204         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
205         assertEquals("Entries size", 1, appendEntries.getEntries().size());
206         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
207         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
208         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
209         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
210     }
211
212     @Test
213     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
214         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
215
216         MockRaftActorContext actorContext = createActorContextWithFollower();
217         actorContext.setCommitIndex(-1);
218         actorContext.setLastApplied(-1);
219
220         // The raft context is initialized with a couple log entries. However the commitIndex
221         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
222         // committed and applied. Now it regains leadership with a higher term (2).
223         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
224         long newTerm = prevTerm + 1;
225         actorContext.getTermInformation().update(newTerm, "");
226
227         leader = new Leader(actorContext);
228         actorContext.setCurrentBehavior(leader);
229
230         // Leader will send an immediate heartbeat - ignore it.
231         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
232
233         // The follower replies with the leader's current last index and term, simulating that it is
234         // up to date with the leader.
235         long lastIndex = actorContext.getReplicatedLog().lastIndex();
236         leader.handleMessage(followerActor, new AppendEntriesReply(
237                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
238
239         // The commit index should not get updated even though consensus was reached. This is b/c the
240         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
241         // from previous terms by counting replicas".
242         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
243
244         followerActor.underlyingActor().clear();
245
246         // Now replicate a new entry with the new term 2.
247         long newIndex = lastIndex + 1;
248         sendReplicate(actorContext, newTerm, newIndex);
249
250         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
251         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
252         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
253         assertEquals("Entries size", 1, appendEntries.getEntries().size());
254         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
255         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
256         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
257
258         // The follower replies with success. The leader should now update the commit index to the new index
259         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
260         // prior entries are committed indirectly".
261         leader.handleMessage(followerActor, new AppendEntriesReply(
262                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
263
264         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
265     }
266
267     @Test
268     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
269         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
270
271         MockRaftActorContext actorContext = createActorContextWithFollower();
272         actorContext.setRaftPolicy(createRaftPolicy(true, true));
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
291
292         // State should not change
293         assertTrue(raftBehavior instanceof Leader);
294
295         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
296         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
297         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
298         assertEquals("Entries size", 1, appendEntries.getEntries().size());
299         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
300         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
301         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
302         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
303     }
304
305     @Test
306     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
307         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
308
309         MockRaftActorContext actorContext = createActorContextWithFollower();
310         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
311             @Override
312             public FiniteDuration getHeartBeatInterval() {
313                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
314             }
315         });
316
317         long term = 1;
318         actorContext.getTermInformation().update(term, "");
319
320         leader = new Leader(actorContext);
321
322         // Leader will send an immediate heartbeat - ignore it.
323         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
324
325         // The follower would normally reply - simulate that explicitly here.
326         long lastIndex = actorContext.getReplicatedLog().lastIndex();
327         leader.handleMessage(followerActor, new AppendEntriesReply(
328                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
329         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
330
331         followerActor.underlyingActor().clear();
332
333         for (int i = 0; i < 5; i++) {
334             sendReplicate(actorContext, lastIndex + i + 1);
335         }
336
337         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
338         // We expect only 1 message to be sent because of two reasons,
339         // - an append entries reply was not received
340         // - the heartbeat interval has not expired
341         // In this scenario if multiple messages are sent they would likely be duplicates
342         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
343     }
344
345     @Test
346     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
347         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
348
349         MockRaftActorContext actorContext = createActorContextWithFollower();
350         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
351             @Override
352             public FiniteDuration getHeartBeatInterval() {
353                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
354             }
355         });
356
357         long term = 1;
358         actorContext.getTermInformation().update(term, "");
359
360         leader = new Leader(actorContext);
361
362         // Leader will send an immediate heartbeat - ignore it.
363         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
364
365         // The follower would normally reply - simulate that explicitly here.
366         long lastIndex = actorContext.getReplicatedLog().lastIndex();
367         leader.handleMessage(followerActor, new AppendEntriesReply(
368                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
369         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
370
371         followerActor.underlyingActor().clear();
372
373         for (int i = 0; i < 3; i++) {
374             sendReplicate(actorContext, lastIndex + i + 1);
375             leader.handleMessage(followerActor, new AppendEntriesReply(
376                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
377         }
378
379         // We are expecting six messages here -- a request to replicate and a consensus-reached message
380         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
381         assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
382         for (int i = 0; i < 3; i++) {
383             assertRequestEntry(lastIndex, allMessages, i);
384             assertCommitEntry(lastIndex, allMessages, i);
385         }
386
387         // Now perform another commit, eliciting a request to persist
388         sendReplicate(actorContext, lastIndex + 3 + 1);
389         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
390         // This elicits another message for request to replicate
391         assertEquals("The number of request entries collected", 7, allMessages.size());
392         assertRequestEntry(lastIndex, allMessages, 3);
393
394         sendReplicate(actorContext, lastIndex + 4 + 1);
395         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
396         assertEquals("The number of request entries collected", 7, allMessages.size());
397     }
398
399     private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
400             final int messageNr) {
401         final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
402         assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
403         assertEquals(List.of(), commitReq.getEntries());
404     }
405
406     private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
407             final int messageNr) {
408         final AppendEntries req = allMessages.get(2 * messageNr);
409         assertEquals(lastIndex + messageNr, req.getLeaderCommit());
410
411         final List<ReplicatedLogEntry> entries = req.getEntries();
412         assertEquals(1, entries.size());
413         assertEquals(messageNr + 2, entries.get(0).getIndex());
414     }
415
416     @Test
417     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
418         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
419
420         MockRaftActorContext actorContext = createActorContextWithFollower();
421         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
422             @Override
423             public FiniteDuration getHeartBeatInterval() {
424                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
425             }
426         });
427
428         long term = 1;
429         actorContext.getTermInformation().update(term, "");
430
431         leader = new Leader(actorContext);
432
433         // Leader will send an immediate heartbeat - ignore it.
434         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
435
436         // The follower would normally reply - simulate that explicitly here.
437         long lastIndex = actorContext.getReplicatedLog().lastIndex();
438         leader.handleMessage(followerActor, new AppendEntriesReply(
439                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
440         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
441
442         followerActor.underlyingActor().clear();
443
444         sendReplicate(actorContext, lastIndex + 1);
445
446         // Wait slightly longer than heartbeat duration
447         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
448
449         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
450
451         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
452         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
453
454         assertEquals(1, allMessages.get(0).getEntries().size());
455         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
456         assertEquals(1, allMessages.get(1).getEntries().size());
457         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
458
459     }
460
461     @Test
462     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
463         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
464
465         MockRaftActorContext actorContext = createActorContextWithFollower();
466         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
467             @Override
468             public FiniteDuration getHeartBeatInterval() {
469                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
470             }
471         });
472
473         long term = 1;
474         actorContext.getTermInformation().update(term, "");
475
476         leader = new Leader(actorContext);
477
478         // Leader will send an immediate heartbeat - ignore it.
479         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
480
481         // The follower would normally reply - simulate that explicitly here.
482         long lastIndex = actorContext.getReplicatedLog().lastIndex();
483         leader.handleMessage(followerActor, new AppendEntriesReply(
484                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
485         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
486
487         followerActor.underlyingActor().clear();
488
489         for (int i = 0; i < 3; i++) {
490             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
491             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
492         }
493
494         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
495         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
496     }
497
498     @Test
499     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
500         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
501
502         MockRaftActorContext actorContext = createActorContextWithFollower();
503         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
504             @Override
505             public FiniteDuration getHeartBeatInterval() {
506                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
507             }
508         });
509
510         long term = 1;
511         actorContext.getTermInformation().update(term, "");
512
513         leader = new Leader(actorContext);
514
515         // Leader will send an immediate heartbeat - ignore it.
516         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
517
518         // The follower would normally reply - simulate that explicitly here.
519         long lastIndex = actorContext.getReplicatedLog().lastIndex();
520         leader.handleMessage(followerActor, new AppendEntriesReply(
521                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
522         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
523
524         followerActor.underlyingActor().clear();
525
526         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
527         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
528         sendReplicate(actorContext, lastIndex + 1);
529
530         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
531         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
532
533         assertEquals(0, allMessages.get(0).getEntries().size());
534         assertEquals(1, allMessages.get(1).getEntries().size());
535     }
536
537
538     @Test
539     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
540         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
541
542         MockRaftActorContext actorContext = createActorContext();
543
544         leader = new Leader(actorContext);
545
546         actorContext.setLastApplied(0);
547
548         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
549         long term = actorContext.getTermInformation().getCurrentTerm();
550         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
551                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
552
553         actorContext.getReplicatedLog().append(newEntry);
554
555         final Identifier id = new MockIdentifier("state-id");
556         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
557                 new Replicate(leaderActor, id, newEntry, true));
558
559         // State should not change
560         assertTrue(raftBehavior instanceof Leader);
561
562         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
563
564         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
565         // one since lastApplied state is 0.
566         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
567                 leaderActor, ApplyState.class);
568         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
569
570         for (int i = 0; i <= newLogIndex - 1; i++) {
571             ApplyState applyState = applyStateList.get(i);
572             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
573             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
574         }
575
576         ApplyState last = applyStateList.get((int) newLogIndex - 1);
577         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
578         assertEquals("getIdentifier", id, last.getIdentifier());
579     }
580
581     @Test
582     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
583         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
584
585         final MockRaftActorContext actorContext = createActorContextWithFollower();
586
587         //clears leaders log
588         actorContext.getReplicatedLog().removeFrom(0);
589
590         final int commitIndex = 3;
591         final int snapshotIndex = 2;
592         final int snapshotTerm = 1;
593
594         // set the snapshot variables in replicatedlog
595         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
596         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
597         actorContext.setCommitIndex(commitIndex);
598         //set follower timeout to 2 mins, helps during debugging
599         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
600
601         leader = new Leader(actorContext);
602
603         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
604         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
605
606         //update follower timestamp
607         leader.markFollowerActive(FOLLOWER_ID);
608
609         ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
610         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
611                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
612                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
613         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
614                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
615         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
616         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
617
618         //send first chunk and no InstallSnapshotReply received yet
619         fts.getNextChunk();
620         fts.incrementChunkIndex();
621
622         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
623                 TimeUnit.MILLISECONDS);
624
625         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
626
627         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
628
629         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
630
631         //InstallSnapshotReply received
632         fts.markSendStatus(true);
633
634         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
635
636         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
637
638         assertEquals(commitIndex, is.getLastIncludedIndex());
639     }
640
641     @Test
642     public void testSendAppendEntriesSnapshotScenario() {
643         logStart("testSendAppendEntriesSnapshotScenario");
644
645         final MockRaftActorContext actorContext = createActorContextWithFollower();
646
647         Map<String, String> leadersSnapshot = new HashMap<>();
648         leadersSnapshot.put("1", "A");
649         leadersSnapshot.put("2", "B");
650         leadersSnapshot.put("3", "C");
651
652         //clears leaders log
653         actorContext.getReplicatedLog().removeFrom(0);
654
655         final int followersLastIndex = 2;
656         final int snapshotIndex = 3;
657         final int newEntryIndex = 4;
658         final int snapshotTerm = 1;
659         final int currentTerm = 2;
660
661         // set the snapshot variables in replicatedlog
662         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
663         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
664         actorContext.setCommitIndex(followersLastIndex);
665
666         leader = new Leader(actorContext);
667
668         // Leader will send an immediate heartbeat - ignore it.
669         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
670
671         // new entry
672         SimpleReplicatedLogEntry entry =
673                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
674                         new MockRaftActorContext.MockPayload("D"));
675
676         actorContext.getReplicatedLog().append(entry);
677
678         //update follower timestamp
679         leader.markFollowerActive(FOLLOWER_ID);
680
681         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
682         RaftActorBehavior raftBehavior = leader.handleMessage(
683                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
684
685         assertTrue(raftBehavior instanceof Leader);
686
687         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
688     }
689
690     @Test
691     public void testInitiateInstallSnapshot() {
692         logStart("testInitiateInstallSnapshot");
693
694         MockRaftActorContext actorContext = createActorContextWithFollower();
695
696         //clears leaders log
697         actorContext.getReplicatedLog().removeFrom(0);
698
699         final int followersLastIndex = 2;
700         final int snapshotIndex = 3;
701         final int newEntryIndex = 4;
702         final int snapshotTerm = 1;
703         final int currentTerm = 2;
704
705         // set the snapshot variables in replicatedlog
706         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
707         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
708         actorContext.setLastApplied(3);
709         actorContext.setCommitIndex(followersLastIndex);
710
711         leader = new Leader(actorContext);
712
713         // Leader will send an immediate heartbeat - ignore it.
714         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
715
716         // set the snapshot as absent and check if capture-snapshot is invoked.
717         leader.setSnapshotHolder(null);
718
719         // new entry
720         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
721                 new MockRaftActorContext.MockPayload("D"));
722
723         actorContext.getReplicatedLog().append(entry);
724
725         //update follower timestamp
726         leader.markFollowerActive(FOLLOWER_ID);
727
728         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
729
730         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
731
732         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
733
734         assertEquals(3, cs.getLastAppliedIndex());
735         assertEquals(1, cs.getLastAppliedTerm());
736         assertEquals(4, cs.getLastIndex());
737         assertEquals(2, cs.getLastTerm());
738
739         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
740         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
741
742         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
743     }
744
745     @Test
746     public void testInitiateForceInstallSnapshot() throws Exception {
747         logStart("testInitiateForceInstallSnapshot");
748
749         MockRaftActorContext actorContext = createActorContextWithFollower();
750
751         final int followersLastIndex = 2;
752         final int snapshotIndex = -1;
753         final int newEntryIndex = 4;
754         final int snapshotTerm = -1;
755         final int currentTerm = 2;
756
757         // set the snapshot variables in replicatedlog
758         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
759         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
760         actorContext.setLastApplied(3);
761         actorContext.setCommitIndex(followersLastIndex);
762
763         actorContext.getReplicatedLog().removeFrom(0);
764
765         AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
766         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
767
768         leader = new Leader(actorContext);
769         actorContext.setCurrentBehavior(leader);
770
771         // Leader will send an immediate heartbeat - ignore it.
772         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
773
774         // set the snapshot as absent and check if capture-snapshot is invoked.
775         leader.setSnapshotHolder(null);
776
777         for (int i = 0; i < 4; i++) {
778             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
779                     new MockRaftActorContext.MockPayload("X" + i)));
780         }
781
782         // new entry
783         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
784                 new MockRaftActorContext.MockPayload("D"));
785
786         actorContext.getReplicatedLog().append(entry);
787
788         //update follower timestamp
789         leader.markFollowerActive(FOLLOWER_ID);
790
791         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
792         // installed with a SendInstallSnapshot
793         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
794                 RaftVersions.CURRENT_VERSION));
795
796         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
797
798         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
799         assertEquals(3, cs.getLastAppliedIndex());
800         assertEquals(1, cs.getLastAppliedTerm());
801         assertEquals(4, cs.getLastIndex());
802         assertEquals(2, cs.getLastTerm());
803
804         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
805         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
806
807         MessageCollectorActor.clearMessages(followerActor);
808
809         // Sending Replicate message should not initiate another capture since the first is in progress.
810         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
811         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
812
813         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
814         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
815                 RaftVersions.CURRENT_VERSION));
816         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
817
818         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
819         final byte[] bytes = new byte[]{1, 2, 3};
820         installSnapshotStream.get().get().write(bytes);
821         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
822                 Runtime.getRuntime().totalMemory());
823         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
824
825         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
826         MessageCollectorActor.clearMessages(followerActor);
827         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
828                 RaftVersions.CURRENT_VERSION));
829         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
830     }
831
832
833     @Test
834     public void testInstallSnapshot() {
835         logStart("testInstallSnapshot");
836
837         final MockRaftActorContext actorContext = createActorContextWithFollower();
838
839         Map<String, String> leadersSnapshot = new HashMap<>();
840         leadersSnapshot.put("1", "A");
841         leadersSnapshot.put("2", "B");
842         leadersSnapshot.put("3", "C");
843
844         //clears leaders log
845         actorContext.getReplicatedLog().removeFrom(0);
846
847         final int lastAppliedIndex = 3;
848         final int snapshotIndex = 2;
849         final int snapshotTerm = 1;
850         final int currentTerm = 2;
851
852         // set the snapshot variables in replicatedlog
853         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
854         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
855         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
856         actorContext.setCommitIndex(lastAppliedIndex);
857         actorContext.setLastApplied(lastAppliedIndex);
858
859         leader = new Leader(actorContext);
860
861         // Initial heartbeat.
862         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863
864         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
865         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
866
867         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
868         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
869                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
870
871         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
872                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
873
874         assertTrue(raftBehavior instanceof Leader);
875
876         // check if installsnapshot gets called with the correct values.
877
878         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
879                 InstallSnapshot.class);
880
881         assertNotNull(installSnapshot.getData());
882         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
883         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
884
885         assertEquals(currentTerm, installSnapshot.getTerm());
886     }
887
888     @Test
889     public void testForceInstallSnapshot() {
890         logStart("testForceInstallSnapshot");
891
892         final MockRaftActorContext actorContext = createActorContextWithFollower();
893
894         Map<String, String> leadersSnapshot = new HashMap<>();
895         leadersSnapshot.put("1", "A");
896         leadersSnapshot.put("2", "B");
897         leadersSnapshot.put("3", "C");
898
899         final int lastAppliedIndex = 3;
900         final int snapshotIndex = -1;
901         final int snapshotTerm = -1;
902         final int currentTerm = 2;
903
904         // set the snapshot variables in replicatedlog
905         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
906         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
907         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
908         actorContext.setCommitIndex(lastAppliedIndex);
909         actorContext.setLastApplied(lastAppliedIndex);
910
911         leader = new Leader(actorContext);
912
913         // Initial heartbeat.
914         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
915
916         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
917         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
918
919         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
920         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
921                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
922
923         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
924                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
925
926         assertTrue(raftBehavior instanceof Leader);
927
928         // check if installsnapshot gets called with the correct values.
929
930         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
931                 InstallSnapshot.class);
932
933         assertNotNull(installSnapshot.getData());
934         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
935         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
936
937         assertEquals(currentTerm, installSnapshot.getTerm());
938     }
939
940     @Test
941     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
942         logStart("testHandleInstallSnapshotReplyLastChunk");
943
944         MockRaftActorContext actorContext = createActorContextWithFollower();
945
946         final int commitIndex = 3;
947         final int snapshotIndex = 2;
948         final int snapshotTerm = 1;
949         final int currentTerm = 2;
950
951         actorContext.setCommitIndex(commitIndex);
952
953         leader = new Leader(actorContext);
954         actorContext.setCurrentBehavior(leader);
955
956         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
957         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
958
959         // Ignore initial heartbeat.
960         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
961
962         Map<String, String> leadersSnapshot = new HashMap<>();
963         leadersSnapshot.put("1", "A");
964         leadersSnapshot.put("2", "B");
965         leadersSnapshot.put("3", "C");
966
967         // set the snapshot variables in replicatedlog
968
969         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
970         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
971         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
972
973         ByteString bs = toByteString(leadersSnapshot);
974         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
975                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
976                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
977         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
978                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
979         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
980         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
981         while (!fts.isLastChunk(fts.getChunkIndex())) {
982             fts.getNextChunk();
983             fts.incrementChunkIndex();
984         }
985
986         //clears leaders log
987         actorContext.getReplicatedLog().removeFrom(0);
988
989         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
990                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
991
992         assertTrue(raftBehavior instanceof Leader);
993
994         assertEquals(1, leader.followerLogSize());
995         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
996         assertNotNull(fli);
997         assertNull(fli.getInstallSnapshotState());
998         assertEquals(commitIndex, fli.getMatchIndex());
999         assertEquals(commitIndex + 1, fli.getNextIndex());
1000         assertFalse(leader.hasSnapshot());
1001     }
1002
1003     @Test
1004     public void testSendSnapshotfromInstallSnapshotReply() {
1005         logStart("testSendSnapshotfromInstallSnapshotReply");
1006
1007         MockRaftActorContext actorContext = createActorContextWithFollower();
1008
1009         final int commitIndex = 3;
1010         final int snapshotIndex = 2;
1011         final int snapshotTerm = 1;
1012         final int currentTerm = 2;
1013
1014         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1015             @Override
1016             public int getSnapshotChunkSize() {
1017                 return 50;
1018             }
1019         };
1020         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1021         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1022
1023         actorContext.setConfigParams(configParams);
1024         actorContext.setCommitIndex(commitIndex);
1025
1026         leader = new Leader(actorContext);
1027         actorContext.setCurrentBehavior(leader);
1028
1029         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1030         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1031
1032         Map<String, String> leadersSnapshot = new HashMap<>();
1033         leadersSnapshot.put("1", "A");
1034         leadersSnapshot.put("2", "B");
1035         leadersSnapshot.put("3", "C");
1036
1037         // set the snapshot variables in replicatedlog
1038         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1039         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1040         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1041
1042         ByteString bs = toByteString(leadersSnapshot);
1043         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1044                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1045
1046         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1047
1048         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1049                 InstallSnapshot.class);
1050
1051         assertEquals(1, installSnapshot.getChunkIndex());
1052         assertEquals(3, installSnapshot.getTotalChunks());
1053
1054         followerActor.underlyingActor().clear();
1055         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1056                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1057
1058         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1059
1060         assertEquals(2, installSnapshot.getChunkIndex());
1061         assertEquals(3, installSnapshot.getTotalChunks());
1062
1063         followerActor.underlyingActor().clear();
1064         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1065                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1066
1067         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1068
1069         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1070         followerActor.underlyingActor().clear();
1071         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1072                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1073
1074         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1075
1076         assertNull(installSnapshot);
1077     }
1078
1079
1080     @Test
1081     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1082         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1083
1084         MockRaftActorContext actorContext = createActorContextWithFollower();
1085
1086         final int commitIndex = 3;
1087         final int snapshotIndex = 2;
1088         final int snapshotTerm = 1;
1089         final int currentTerm = 2;
1090
1091         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1092             @Override
1093             public int getSnapshotChunkSize() {
1094                 return 50;
1095             }
1096         });
1097
1098         actorContext.setCommitIndex(commitIndex);
1099
1100         leader = new Leader(actorContext);
1101
1102         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1103         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1104
1105         Map<String, String> leadersSnapshot = new HashMap<>();
1106         leadersSnapshot.put("1", "A");
1107         leadersSnapshot.put("2", "B");
1108         leadersSnapshot.put("3", "C");
1109
1110         // set the snapshot variables in replicatedlog
1111         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1112         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1113         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1114
1115         ByteString bs = toByteString(leadersSnapshot);
1116         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1117                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1118
1119         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1120         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1121
1122         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1123                 InstallSnapshot.class);
1124
1125         assertEquals(1, installSnapshot.getChunkIndex());
1126         assertEquals(3, installSnapshot.getTotalChunks());
1127
1128         followerActor.underlyingActor().clear();
1129
1130         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1131                 FOLLOWER_ID, -1, false));
1132
1133         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1134                 TimeUnit.MILLISECONDS);
1135
1136         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1137
1138         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1139
1140         assertEquals(1, installSnapshot.getChunkIndex());
1141         assertEquals(3, installSnapshot.getTotalChunks());
1142     }
1143
1144     @Test
1145     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1146         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1147
1148         MockRaftActorContext actorContext = createActorContextWithFollower();
1149
1150         final int commitIndex = 3;
1151         final int snapshotIndex = 2;
1152         final int snapshotTerm = 1;
1153         final int currentTerm = 2;
1154
1155         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1156             @Override
1157             public int getSnapshotChunkSize() {
1158                 return 50;
1159             }
1160         });
1161
1162         actorContext.setCommitIndex(commitIndex);
1163
1164         leader = new Leader(actorContext);
1165
1166         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1167         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1168
1169         Map<String, String> leadersSnapshot = new HashMap<>();
1170         leadersSnapshot.put("1", "A");
1171         leadersSnapshot.put("2", "B");
1172         leadersSnapshot.put("3", "C");
1173
1174         // set the snapshot variables in replicatedlog
1175         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1176         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1177         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1178
1179         ByteString bs = toByteString(leadersSnapshot);
1180         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1181                 List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
1182
1183         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1184
1185         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1186                 InstallSnapshot.class);
1187
1188         assertEquals(1, installSnapshot.getChunkIndex());
1189         assertEquals(3, installSnapshot.getTotalChunks());
1190         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1191                 installSnapshot.getLastChunkHashCode().getAsInt());
1192
1193         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1194
1195         followerActor.underlyingActor().clear();
1196
1197         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1198                 FOLLOWER_ID, 1, true));
1199
1200         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1201
1202         assertEquals(2, installSnapshot.getChunkIndex());
1203         assertEquals(3, installSnapshot.getTotalChunks());
1204         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1205     }
1206
1207     @Test
1208     public void testLeaderInstallSnapshotState() throws IOException {
1209         logStart("testLeaderInstallSnapshotState");
1210
1211         Map<String, String> leadersSnapshot = new HashMap<>();
1212         leadersSnapshot.put("1", "A");
1213         leadersSnapshot.put("2", "B");
1214         leadersSnapshot.put("3", "C");
1215
1216         ByteString bs = toByteString(leadersSnapshot);
1217         byte[] barray = bs.toByteArray();
1218
1219         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1220         fts.setSnapshotBytes(ByteSource.wrap(barray));
1221
1222         assertEquals(bs.size(), barray.length);
1223
1224         int chunkIndex = 0;
1225         for (int i = 0; i < barray.length; i = i + 50) {
1226             int length = i + 50;
1227             chunkIndex++;
1228
1229             if (i + 50 > barray.length) {
1230                 length = barray.length;
1231             }
1232
1233             byte[] chunk = fts.getNextChunk();
1234             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1235             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1236
1237             fts.markSendStatus(true);
1238             if (!fts.isLastChunk(chunkIndex)) {
1239                 fts.incrementChunkIndex();
1240             }
1241         }
1242
1243         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1244         fts.close();
1245     }
1246
1247     @Override
1248     protected Leader createBehavior(final RaftActorContext actorContext) {
1249         return new Leader(actorContext);
1250     }
1251
1252     @Override
1253     protected MockRaftActorContext createActorContext() {
1254         return createActorContext(leaderActor);
1255     }
1256
1257     @Override
1258     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1259         return createActorContext(LEADER_ID, actorRef);
1260     }
1261
1262     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1263         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1264         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1265         configParams.setElectionTimeoutFactor(100000);
1266         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1267         context.setConfigParams(configParams);
1268         context.setPayloadVersion(payloadVersion);
1269         return context;
1270     }
1271
1272     private MockRaftActorContext createActorContextWithFollower() {
1273         MockRaftActorContext actorContext = createActorContext();
1274         actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
1275         return actorContext;
1276     }
1277
1278     private MockRaftActorContext createFollowerActorContextWithLeader() {
1279         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1280         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1281         followerConfig.setElectionTimeoutFactor(10000);
1282         followerActorContext.setConfigParams(followerConfig);
1283         followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1284         return followerActorContext;
1285     }
1286
1287     @Test
1288     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1289         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1290
1291         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1292
1293         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1294
1295         Follower follower = new Follower(followerActorContext);
1296         followerActor.underlyingActor().setBehavior(follower);
1297         followerActorContext.setCurrentBehavior(follower);
1298
1299         Map<String, String> peerAddresses = new HashMap<>();
1300         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1301
1302         leaderActorContext.setPeerAddresses(peerAddresses);
1303
1304         leaderActorContext.getReplicatedLog().removeFrom(0);
1305
1306         //create 3 entries
1307         leaderActorContext.setReplicatedLog(
1308                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309
1310         leaderActorContext.setCommitIndex(1);
1311
1312         followerActorContext.getReplicatedLog().removeFrom(0);
1313
1314         // follower too has the exact same log entries and has the same commit index
1315         followerActorContext.setReplicatedLog(
1316                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1317
1318         followerActorContext.setCommitIndex(1);
1319
1320         leader = new Leader(leaderActorContext);
1321         leaderActorContext.setCurrentBehavior(leader);
1322
1323         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1324
1325         assertEquals(-1, appendEntries.getLeaderCommit());
1326         assertEquals(0, appendEntries.getEntries().size());
1327         assertEquals(0, appendEntries.getPrevLogIndex());
1328
1329         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1330                 leaderActor, AppendEntriesReply.class);
1331
1332         assertEquals(2, appendEntriesReply.getLogLastIndex());
1333         assertEquals(1, appendEntriesReply.getLogLastTerm());
1334
1335         // follower returns its next index
1336         assertEquals(2, appendEntriesReply.getLogLastIndex());
1337         assertEquals(1, appendEntriesReply.getLogLastTerm());
1338
1339         follower.close();
1340     }
1341
1342     @Test
1343     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1344         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1345
1346         final MockRaftActorContext leaderActorContext = createActorContext();
1347
1348         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1349         followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
1350
1351         Follower follower = new Follower(followerActorContext);
1352         followerActor.underlyingActor().setBehavior(follower);
1353         followerActorContext.setCurrentBehavior(follower);
1354
1355         Map<String, String> leaderPeerAddresses = new HashMap<>();
1356         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1357
1358         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1359
1360         leaderActorContext.getReplicatedLog().removeFrom(0);
1361
1362         leaderActorContext.setReplicatedLog(
1363                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1364
1365         leaderActorContext.setCommitIndex(1);
1366
1367         followerActorContext.getReplicatedLog().removeFrom(0);
1368
1369         followerActorContext.setReplicatedLog(
1370                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1371
1372         // follower has the same log entries but its commit index > leaders commit index
1373         followerActorContext.setCommitIndex(2);
1374
1375         leader = new Leader(leaderActorContext);
1376
1377         // Initial heartbeat
1378         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1379
1380         assertEquals(-1, appendEntries.getLeaderCommit());
1381         assertEquals(0, appendEntries.getEntries().size());
1382         assertEquals(0, appendEntries.getPrevLogIndex());
1383
1384         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1385                 leaderActor, AppendEntriesReply.class);
1386
1387         assertEquals(2, appendEntriesReply.getLogLastIndex());
1388         assertEquals(1, appendEntriesReply.getLogLastTerm());
1389
1390         leaderActor.underlyingActor().setBehavior(follower);
1391         leader.handleMessage(followerActor, appendEntriesReply);
1392
1393         leaderActor.underlyingActor().clear();
1394         followerActor.underlyingActor().clear();
1395
1396         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1397                 TimeUnit.MILLISECONDS);
1398
1399         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1400
1401         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1402
1403         assertEquals(2, appendEntries.getLeaderCommit());
1404         assertEquals(0, appendEntries.getEntries().size());
1405         assertEquals(2, appendEntries.getPrevLogIndex());
1406
1407         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1408
1409         assertEquals(2, appendEntriesReply.getLogLastIndex());
1410         assertEquals(1, appendEntriesReply.getLogLastTerm());
1411
1412         assertEquals(2, followerActorContext.getCommitIndex());
1413
1414         follower.close();
1415     }
1416
1417     @Test
1418     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1419         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1420
1421         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1422         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1423                 new FiniteDuration(1000, TimeUnit.SECONDS));
1424
1425         leaderActorContext.setReplicatedLog(
1426                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1427         long leaderCommitIndex = 2;
1428         leaderActorContext.setCommitIndex(leaderCommitIndex);
1429         leaderActorContext.setLastApplied(leaderCommitIndex);
1430
1431         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1432         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1433
1434         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1435
1436         followerActorContext.setReplicatedLog(
1437                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1438         followerActorContext.setCommitIndex(0);
1439         followerActorContext.setLastApplied(0);
1440
1441         Follower follower = new Follower(followerActorContext);
1442         followerActor.underlyingActor().setBehavior(follower);
1443
1444         leader = new Leader(leaderActorContext);
1445
1446         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1447         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1448                 AppendEntriesReply.class);
1449
1450         MessageCollectorActor.clearMessages(followerActor);
1451         MessageCollectorActor.clearMessages(leaderActor);
1452
1453         // Verify initial AppendEntries sent.
1454         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1455         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1456         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1457
1458         leaderActor.underlyingActor().setBehavior(leader);
1459
1460         leader.handleMessage(followerActor, appendEntriesReply);
1461
1462         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1463         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1464
1465         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1466         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1467         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1468
1469         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1470         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1471                 appendEntries.getEntries().get(0).getData());
1472         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1473         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1474                 appendEntries.getEntries().get(1).getData());
1475
1476         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1477         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1478
1479         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1480
1481         ApplyState applyState = applyStateList.get(0);
1482         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1483         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1484         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1485                 applyState.getReplicatedLogEntry().getData());
1486
1487         applyState = applyStateList.get(1);
1488         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1489         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1490         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1491                 applyState.getReplicatedLogEntry().getData());
1492
1493         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1494         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1495     }
1496
1497     @Test
1498     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1499         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1500
1501         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1502         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1503                 new FiniteDuration(1000, TimeUnit.SECONDS));
1504
1505         leaderActorContext.setReplicatedLog(
1506                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1507         long leaderCommitIndex = 1;
1508         leaderActorContext.setCommitIndex(leaderCommitIndex);
1509         leaderActorContext.setLastApplied(leaderCommitIndex);
1510
1511         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1512         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1513
1514         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1515
1516         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1517         followerActorContext.setCommitIndex(-1);
1518         followerActorContext.setLastApplied(-1);
1519
1520         Follower follower = new Follower(followerActorContext);
1521         followerActor.underlyingActor().setBehavior(follower);
1522         followerActorContext.setCurrentBehavior(follower);
1523
1524         leader = new Leader(leaderActorContext);
1525
1526         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1527         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1528                 AppendEntriesReply.class);
1529
1530         MessageCollectorActor.clearMessages(followerActor);
1531         MessageCollectorActor.clearMessages(leaderActor);
1532
1533         // Verify initial AppendEntries sent with the leader's current commit index.
1534         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1535         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1536         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1537
1538         leaderActor.underlyingActor().setBehavior(leader);
1539         leaderActorContext.setCurrentBehavior(leader);
1540
1541         leader.handleMessage(followerActor, appendEntriesReply);
1542
1543         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1544         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1545
1546         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1547         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1548         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1549
1550         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1551         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1552                 appendEntries.getEntries().get(0).getData());
1553         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1554         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1555                 appendEntries.getEntries().get(1).getData());
1556
1557         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1558         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1559
1560         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1561
1562         ApplyState applyState = applyStateList.get(0);
1563         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1564         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1565         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1566                 applyState.getReplicatedLogEntry().getData());
1567
1568         applyState = applyStateList.get(1);
1569         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1570         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1571         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1572                 applyState.getReplicatedLogEntry().getData());
1573
1574         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1575         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1576     }
1577
1578     @Test
1579     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1580         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1581
1582         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1583         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1584                 new FiniteDuration(1000, TimeUnit.SECONDS));
1585
1586         leaderActorContext.setReplicatedLog(
1587                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1588         long leaderCommitIndex = 1;
1589         leaderActorContext.setCommitIndex(leaderCommitIndex);
1590         leaderActorContext.setLastApplied(leaderCommitIndex);
1591
1592         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1593         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1594
1595         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1596
1597         followerActorContext.setReplicatedLog(
1598                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1599         followerActorContext.setCommitIndex(-1);
1600         followerActorContext.setLastApplied(-1);
1601
1602         Follower follower = new Follower(followerActorContext);
1603         followerActor.underlyingActor().setBehavior(follower);
1604         followerActorContext.setCurrentBehavior(follower);
1605
1606         leader = new Leader(leaderActorContext);
1607
1608         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1609         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1610                 AppendEntriesReply.class);
1611
1612         MessageCollectorActor.clearMessages(followerActor);
1613         MessageCollectorActor.clearMessages(leaderActor);
1614
1615         // Verify initial AppendEntries sent with the leader's current commit index.
1616         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1617         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1618         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1619
1620         leaderActor.underlyingActor().setBehavior(leader);
1621         leaderActorContext.setCurrentBehavior(leader);
1622
1623         leader.handleMessage(followerActor, appendEntriesReply);
1624
1625         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1626         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1627
1628         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1629         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1630         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1631
1632         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1633         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1634         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1635                 appendEntries.getEntries().get(0).getData());
1636         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1637         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1638         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1639                 appendEntries.getEntries().get(1).getData());
1640
1641         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1642         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1643
1644         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1645
1646         ApplyState applyState = applyStateList.get(0);
1647         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1648         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1649         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1650                 applyState.getReplicatedLogEntry().getData());
1651
1652         applyState = applyStateList.get(1);
1653         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1654         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1655         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1656                 applyState.getReplicatedLogEntry().getData());
1657
1658         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1659         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1660         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1661     }
1662
1663     @Test
1664     public void testHandleAppendEntriesReplyWithNewerTerm() {
1665         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1666
1667         MockRaftActorContext leaderActorContext = createActorContext();
1668         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1669                 new FiniteDuration(10000, TimeUnit.SECONDS));
1670
1671         leaderActorContext.setReplicatedLog(
1672                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1673
1674         leader = new Leader(leaderActorContext);
1675         leaderActor.underlyingActor().setBehavior(leader);
1676         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1677
1678         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1679                 AppendEntriesReply.class);
1680
1681         assertEquals(false, appendEntriesReply.isSuccess());
1682         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1683
1684         MessageCollectorActor.clearMessages(leaderActor);
1685     }
1686
1687     @Test
1688     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1689         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1690
1691         MockRaftActorContext leaderActorContext = createActorContext();
1692         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1693                 new FiniteDuration(10000, TimeUnit.SECONDS));
1694
1695         leaderActorContext.setReplicatedLog(
1696                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1697         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1698
1699         leader = new Leader(leaderActorContext);
1700         leaderActor.underlyingActor().setBehavior(leader);
1701         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1702
1703         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1704                 AppendEntriesReply.class);
1705
1706         assertEquals(false, appendEntriesReply.isSuccess());
1707         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1708
1709         MessageCollectorActor.clearMessages(leaderActor);
1710     }
1711
1712     @Test
1713     public void testHandleAppendEntriesReplySuccess() {
1714         logStart("testHandleAppendEntriesReplySuccess");
1715
1716         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1717
1718         leaderActorContext.setReplicatedLog(
1719                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1720
1721         leaderActorContext.setCommitIndex(1);
1722         leaderActorContext.setLastApplied(1);
1723         leaderActorContext.getTermInformation().update(1, "leader");
1724
1725         leader = new Leader(leaderActorContext);
1726
1727         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1728
1729         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1730         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1731
1732         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1733
1734         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1735
1736         assertEquals(RaftState.Leader, raftActorBehavior.state());
1737
1738         assertEquals(2, leaderActorContext.getCommitIndex());
1739
1740         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1741                 leaderActor, ApplyJournalEntries.class);
1742
1743         assertEquals(2, leaderActorContext.getLastApplied());
1744
1745         assertEquals(2, applyJournalEntries.getToIndex());
1746
1747         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1748                 ApplyState.class);
1749
1750         assertEquals(1,applyStateList.size());
1751
1752         ApplyState applyState = applyStateList.get(0);
1753
1754         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1755
1756         assertEquals(2, followerInfo.getMatchIndex());
1757         assertEquals(3, followerInfo.getNextIndex());
1758         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1759         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1760     }
1761
1762     @Test
1763     public void testHandleAppendEntriesReplyUnknownFollower() {
1764         logStart("testHandleAppendEntriesReplyUnknownFollower");
1765
1766         MockRaftActorContext leaderActorContext = createActorContext();
1767
1768         leader = new Leader(leaderActorContext);
1769
1770         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1771
1772         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1773
1774         assertEquals(RaftState.Leader, raftActorBehavior.state());
1775     }
1776
1777     @Test
1778     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1779         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1780
1781         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1782         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1783                 new FiniteDuration(1000, TimeUnit.SECONDS));
1784         // Note: the size here depends on estimate
1785         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1786
1787         leaderActorContext.setReplicatedLog(
1788                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1789         long leaderCommitIndex = 3;
1790         leaderActorContext.setCommitIndex(leaderCommitIndex);
1791         leaderActorContext.setLastApplied(leaderCommitIndex);
1792
1793         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1794         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1795         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1796         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1797
1798         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1799
1800         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1801         followerActorContext.setCommitIndex(-1);
1802         followerActorContext.setLastApplied(-1);
1803
1804         Follower follower = new Follower(followerActorContext);
1805         followerActor.underlyingActor().setBehavior(follower);
1806         followerActorContext.setCurrentBehavior(follower);
1807
1808         leader = new Leader(leaderActorContext);
1809
1810         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1811         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1812                 AppendEntriesReply.class);
1813
1814         MessageCollectorActor.clearMessages(followerActor);
1815         MessageCollectorActor.clearMessages(leaderActor);
1816
1817         // Verify initial AppendEntries sent with the leader's current commit index.
1818         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1819         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1820         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1821
1822         leaderActor.underlyingActor().setBehavior(leader);
1823         leaderActorContext.setCurrentBehavior(leader);
1824
1825         leader.handleMessage(followerActor, appendEntriesReply);
1826
1827         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1828                 AppendEntries.class, 2);
1829         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1830
1831         appendEntries = appendEntriesList.get(0);
1832         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1833         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1834         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1835
1836         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1837         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1838                 appendEntries.getEntries().get(0).getData());
1839         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1840         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1841                 appendEntries.getEntries().get(1).getData());
1842
1843         appendEntries = appendEntriesList.get(1);
1844         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1845         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1846         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1847
1848         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1849         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1850                 appendEntries.getEntries().get(0).getData());
1851         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1852         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1853                 appendEntries.getEntries().get(1).getData());
1854
1855         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1856         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1857
1858         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1859
1860         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1861         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1862     }
1863
1864     @Test
1865     public void testHandleRequestVoteReply() {
1866         logStart("testHandleRequestVoteReply");
1867
1868         MockRaftActorContext leaderActorContext = createActorContext();
1869
1870         leader = new Leader(leaderActorContext);
1871
1872         // Should be a no-op.
1873         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1874                 new RequestVoteReply(1, true));
1875
1876         assertEquals(RaftState.Leader, raftActorBehavior.state());
1877
1878         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1879
1880         assertEquals(RaftState.Leader, raftActorBehavior.state());
1881     }
1882
1883     @Test
1884     public void testIsolatedLeaderCheckNoFollowers() {
1885         logStart("testIsolatedLeaderCheckNoFollowers");
1886
1887         MockRaftActorContext leaderActorContext = createActorContext();
1888
1889         leader = new Leader(leaderActorContext);
1890         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1891         assertTrue(newBehavior instanceof Leader);
1892     }
1893
1894     @Test
1895     public void testIsolatedLeaderCheckNoVotingFollowers() {
1896         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1897
1898         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1899         Follower follower = new Follower(followerActorContext);
1900         followerActor.underlyingActor().setBehavior(follower);
1901
1902         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1903         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1904                 new FiniteDuration(1000, TimeUnit.SECONDS));
1905         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1906
1907         leader = new Leader(leaderActorContext);
1908         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1909         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1910         assertTrue("Expected Leader", newBehavior instanceof Leader);
1911     }
1912
1913     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1914         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1915         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1916
1917         MockRaftActorContext leaderActorContext = createActorContext();
1918
1919         Map<String, String> peerAddresses = new HashMap<>();
1920         peerAddresses.put("follower-1", followerActor1.path().toString());
1921         peerAddresses.put("follower-2", followerActor2.path().toString());
1922
1923         leaderActorContext.setPeerAddresses(peerAddresses);
1924         leaderActorContext.setRaftPolicy(raftPolicy);
1925
1926         leader = new Leader(leaderActorContext);
1927
1928         leader.markFollowerActive("follower-1");
1929         leader.markFollowerActive("follower-2");
1930         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1931         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1932
1933         // kill 1 follower and verify if that got killed
1934         final TestKit probe = new TestKit(getSystem());
1935         probe.watch(followerActor1);
1936         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1937         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1938         assertEquals(termMsg1.getActor(), followerActor1);
1939
1940         leader.markFollowerInActive("follower-1");
1941         leader.markFollowerActive("follower-2");
1942         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943         assertTrue("Behavior not instance of Leader when majority of followers are active",
1944                 newBehavior instanceof Leader);
1945
1946         // kill 2nd follower and leader should change to Isolated leader
1947         followerActor2.tell(PoisonPill.getInstance(), null);
1948         probe.watch(followerActor2);
1949         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1950         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1951         assertEquals(termMsg2.getActor(), followerActor2);
1952
1953         leader.markFollowerInActive("follower-2");
1954         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1955     }
1956
1957     @Test
1958     public void testIsolatedLeaderCheckTwoFollowers() {
1959         logStart("testIsolatedLeaderCheckTwoFollowers");
1960
1961         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1962
1963         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1964             newBehavior instanceof IsolatedLeader);
1965     }
1966
1967     @Test
1968     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1969         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1970
1971         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1972
1973         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1974                 newBehavior instanceof Leader);
1975     }
1976
1977     @Test
1978     public void testLaggingFollowerStarvation() {
1979         logStart("testLaggingFollowerStarvation");
1980
1981         String leaderActorId = actorFactory.generateActorId("leader");
1982         String follower1ActorId = actorFactory.generateActorId("follower");
1983         String follower2ActorId = actorFactory.generateActorId("follower");
1984
1985         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1986         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1987
1988         MockRaftActorContext leaderActorContext =
1989                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1990
1991         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1992         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1993         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1994
1995         leaderActorContext.setConfigParams(configParams);
1996
1997         leaderActorContext.setReplicatedLog(
1998                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1999
2000         Map<String, String> peerAddresses = new HashMap<>();
2001         peerAddresses.put(follower1ActorId,
2002                 follower1Actor.path().toString());
2003         peerAddresses.put(follower2ActorId,
2004                 follower2Actor.path().toString());
2005
2006         leaderActorContext.setPeerAddresses(peerAddresses);
2007         leaderActorContext.getTermInformation().update(1, leaderActorId);
2008
2009         leader = createBehavior(leaderActorContext);
2010
2011         leaderActor.underlyingActor().setBehavior(leader);
2012
2013         for (int i = 1; i < 6; i++) {
2014             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2015             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2016                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2017             assertTrue(newBehavior == leader);
2018             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2019         }
2020
2021         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2022         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2023
2024         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2025                 heartbeats.size() > 1);
2026
2027         // Check if follower-2 got AppendEntries during this time and was not starved
2028         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2029
2030         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2031                 appendEntries.size() > 1);
2032     }
2033
2034     @Test
2035     public void testReplicationConsensusWithNonVotingFollower() {
2036         logStart("testReplicationConsensusWithNonVotingFollower");
2037
2038         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2039         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2040                 new FiniteDuration(1000, TimeUnit.SECONDS));
2041
2042         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2043         leaderActorContext.setCommitIndex(-1);
2044         leaderActorContext.setLastApplied(-1);
2045
2046         String nonVotingFollowerId = "nonvoting-follower";
2047         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2048                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2049
2050         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2051                 VotingState.NON_VOTING);
2052
2053         leader = new Leader(leaderActorContext);
2054         leaderActorContext.setCurrentBehavior(leader);
2055
2056         // Ignore initial heartbeats
2057         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2058         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2059
2060         MessageCollectorActor.clearMessages(followerActor);
2061         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2062         MessageCollectorActor.clearMessages(leaderActor);
2063
2064         // Send a Replicate message and wait for AppendEntries.
2065         sendReplicate(leaderActorContext, 0);
2066
2067         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2068         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2069
2070         // Send reply only from the voting follower and verify consensus via ApplyState.
2071         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2072
2073         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2074
2075         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2076
2077         MessageCollectorActor.clearMessages(followerActor);
2078         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2079         MessageCollectorActor.clearMessages(leaderActor);
2080
2081         // Send another Replicate message
2082         sendReplicate(leaderActorContext, 1);
2083
2084         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2085         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2086                 AppendEntries.class);
2087         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2088         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2089
2090         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2091         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2092
2093         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2094
2095         // Send reply from the voting follower and verify consensus.
2096         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2097
2098         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2099     }
2100
2101     @Test
2102     public void testTransferLeadershipWithFollowerInSync() {
2103         logStart("testTransferLeadershipWithFollowerInSync");
2104
2105         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2106         leaderActorContext.setLastApplied(-1);
2107         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2108                 new FiniteDuration(1000, TimeUnit.SECONDS));
2109         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2110
2111         leader = new Leader(leaderActorContext);
2112         leaderActorContext.setCurrentBehavior(leader);
2113
2114         // Initial heartbeat
2115         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2116         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2117         MessageCollectorActor.clearMessages(followerActor);
2118
2119         sendReplicate(leaderActorContext, 0);
2120         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2121
2122         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2123         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2124         MessageCollectorActor.clearMessages(followerActor);
2125
2126         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2127         leader.transferLeadership(mockTransferCohort);
2128
2129         verify(mockTransferCohort, never()).transferComplete();
2130         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2131         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2132         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2133
2134         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2135         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2136
2137         // Leader should force an election timeout
2138         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2139
2140         verify(mockTransferCohort).transferComplete();
2141     }
2142
2143     @Test
2144     public void testTransferLeadershipWithEmptyLog() {
2145         logStart("testTransferLeadershipWithEmptyLog");
2146
2147         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2148         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2149                 new FiniteDuration(1000, TimeUnit.SECONDS));
2150         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2151
2152         leader = new Leader(leaderActorContext);
2153         leaderActorContext.setCurrentBehavior(leader);
2154
2155         // Initial heartbeat
2156         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2157         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2158         MessageCollectorActor.clearMessages(followerActor);
2159
2160         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2161         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2162         leader.transferLeadership(mockTransferCohort);
2163
2164         verify(mockTransferCohort, never()).transferComplete();
2165         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2166         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2167
2168         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2169         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2170
2171         // Leader should force an election timeout
2172         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2173
2174         verify(mockTransferCohort).transferComplete();
2175     }
2176
2177     @Test
2178     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2179         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2180
2181         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2182         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2183                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2184
2185         leader = new Leader(leaderActorContext);
2186         leaderActorContext.setCurrentBehavior(leader);
2187
2188         // Initial heartbeat
2189         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2190         MessageCollectorActor.clearMessages(followerActor);
2191
2192         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2193         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2194         leader.transferLeadership(mockTransferCohort);
2195
2196         verify(mockTransferCohort, never()).transferComplete();
2197
2198         // Sync up the follower.
2199         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2200         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2201         MessageCollectorActor.clearMessages(followerActor);
2202
2203         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2204                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2205         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2206         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2207         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2208
2209         // Leader should force an election timeout
2210         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2211
2212         verify(mockTransferCohort).transferComplete();
2213     }
2214
2215     @Test
2216     public void testTransferLeadershipWithFollowerSyncTimeout() {
2217         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2218
2219         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2220         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2221                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2222         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2223         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2224
2225         leader = new Leader(leaderActorContext);
2226         leaderActorContext.setCurrentBehavior(leader);
2227
2228         // Initial heartbeat
2229         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2230         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2231         MessageCollectorActor.clearMessages(followerActor);
2232
2233         sendReplicate(leaderActorContext, 0);
2234         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2235
2236         MessageCollectorActor.clearMessages(followerActor);
2237
2238         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2239         leader.transferLeadership(mockTransferCohort);
2240
2241         verify(mockTransferCohort, never()).transferComplete();
2242
2243         // Send heartbeats to time out the transfer.
2244         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2245             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2246                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2247             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2248         }
2249
2250         verify(mockTransferCohort).abortTransfer();
2251         verify(mockTransferCohort, never()).transferComplete();
2252         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2253     }
2254
2255     @Test
2256     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2257         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2258
2259         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2260                 List.of(new SimpleReplicatedLogEntry(0, 1,
2261                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2262         final MockRaftActorContext.MockPayload largePayload =
2263                 new MockRaftActorContext.MockPayload("large", serializedSize);
2264
2265         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2266         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2267                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2268         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2269         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2270         leaderActorContext.setCommitIndex(-1);
2271         leaderActorContext.setLastApplied(-1);
2272
2273         leader = new Leader(leaderActorContext);
2274         leaderActorContext.setCurrentBehavior(leader);
2275
2276         // Send initial heartbeat reply so follower is marked active
2277         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2278         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2279         MessageCollectorActor.clearMessages(followerActor);
2280
2281         // Send normal payload first to prime commit index.
2282         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2283         sendReplicate(leaderActorContext, term, 0);
2284
2285         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2286         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2287         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2288
2289         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2290         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2291         MessageCollectorActor.clearMessages(followerActor);
2292
2293         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2294         sendReplicate(leaderActorContext, term, 1, largePayload);
2295
2296         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2297         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2298         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2299
2300         final Identifier slicingId = messageSlice.getIdentifier();
2301
2302         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2303         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2304         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2305         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2306         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2307         MessageCollectorActor.clearMessages(followerActor);
2308
2309         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2310
2311         // Sleep for the heartbeat interval so AppendEntries is sent.
2312         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2313                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2314
2315         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2316
2317         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2318         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2319         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2320         MessageCollectorActor.clearMessages(followerActor);
2321
2322         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2323
2324         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2325         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2326         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2327
2328         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2329
2330         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2331
2332         MessageCollectorActor.clearMessages(followerActor);
2333
2334         // Send another normal payload.
2335
2336         sendReplicate(leaderActorContext, term, 2);
2337
2338         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2339         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2340         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2341         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2342     }
2343
2344     @Test
2345     public void testLargePayloadSlicingExpiration() {
2346         logStart("testLargePayloadSlicingExpiration");
2347
2348         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2349         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2350                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2351         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2352         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2353         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2354         leaderActorContext.setCommitIndex(-1);
2355         leaderActorContext.setLastApplied(-1);
2356
2357         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2358         leader = new Leader(leaderActorContext);
2359         leaderActorContext.setCurrentBehavior(leader);
2360
2361         // Send initial heartbeat reply so follower is marked active
2362         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2363         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2364         MessageCollectorActor.clearMessages(followerActor);
2365
2366         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2367                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2368         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2369
2370         // Sleep for at least 3 * election timeout so the slicing state expires.
2371         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2372                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2373         MessageCollectorActor.clearMessages(followerActor);
2374
2375         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2376
2377         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2378         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2379         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2380
2381         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2382         MessageCollectorActor.clearMessages(followerActor);
2383
2384         // Send an AppendEntriesReply - this should restart the slicing.
2385
2386         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2387                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2388
2389         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2390
2391         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2392     }
2393
2394     @Test
2395     public void testLeaderAddressInAppendEntries() {
2396         logStart("testLeaderAddressInAppendEntries");
2397
2398         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2399         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2400                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2401         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2402         leaderActorContext.setCommitIndex(-1);
2403         leaderActorContext.setLastApplied(-1);
2404
2405         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2406             peerId -> leaderActor.path().toString());
2407
2408         leader = new Leader(leaderActorContext);
2409         leaderActorContext.setCurrentBehavior(leader);
2410
2411         // Initial heartbeat shouldn't have the leader address
2412
2413         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2414         assertFalse(appendEntries.getLeaderAddress().isPresent());
2415         MessageCollectorActor.clearMessages(followerActor);
2416
2417         // Send AppendEntriesReply indicating the follower needs the leader address
2418
2419         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2420                 RaftVersions.CURRENT_VERSION));
2421
2422         // Sleep for the heartbeat interval so AppendEntries is sent.
2423         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2424                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2425
2426         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2427
2428         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2429         assertTrue(appendEntries.getLeaderAddress().isPresent());
2430         assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2431         MessageCollectorActor.clearMessages(followerActor);
2432
2433         // Send AppendEntriesReply indicating the follower does not need the leader address
2434
2435         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2436                 RaftVersions.CURRENT_VERSION));
2437
2438         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2439                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2440
2441         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2442
2443         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2444         assertFalse(appendEntries.getLeaderAddress().isPresent());
2445     }
2446
2447     @Override
2448     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2449             final ActorRef actorRef, final RaftRPC rpc) {
2450         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2451         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2452     }
2453
2454     private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2455
2456         private final long electionTimeOutIntervalMillis;
2457         private final int snapshotChunkSize;
2458
2459         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2460             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2461             this.snapshotChunkSize = snapshotChunkSize;
2462         }
2463
2464         @Override
2465         public FiniteDuration getElectionTimeOutInterval() {
2466             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2467         }
2468
2469         @Override
2470         public int getSnapshotChunkSize() {
2471             return snapshotChunkSize;
2472         }
2473     }
2474 }