Add Payload.serializedSize()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.io.OutputStream;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Optional;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.Payload;
69 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
70 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
71 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
76 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
83
84     static final String FOLLOWER_ID = "follower";
85     public static final String LEADER_ID = "leader";
86
87     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
89
90     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
92
93     private Leader leader;
94     private final short payloadVersion = 5;
95
96     @Override
97     @After
98     public void tearDown() {
99         if (leader != null) {
100             leader.close();
101         }
102
103         super.tearDown();
104     }
105
106     @Test
107     public void testHandleMessageForUnknownMessage() {
108         logStart("testHandleMessageForUnknownMessage");
109
110         leader = new Leader(createActorContext());
111
112         // handle message should null when it receives an unknown message
113         assertNull(leader.handleMessage(followerActor, "foo"));
114     }
115
116     @Test
117     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
119
120         MockRaftActorContext actorContext = createActorContextWithFollower();
121         actorContext.setCommitIndex(-1);
122         actorContext.setPayloadVersion(payloadVersion);
123
124         long term = 1;
125         actorContext.getTermInformation().update(term, "");
126
127         leader = new Leader(actorContext);
128         actorContext.setCurrentBehavior(leader);
129
130         // Leader should send an immediate heartbeat with no entries as follower is inactive.
131         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133         assertEquals("getTerm", term, appendEntries.getTerm());
134         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136         assertEquals("Entries size", 0, appendEntries.getEntries().size());
137         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
138
139         // The follower would normally reply - simulate that explicitly here.
140         leader.handleMessage(followerActor, new AppendEntriesReply(
141                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
143
144         followerActor.underlyingActor().clear();
145
146         // Sleep for the heartbeat interval so AppendEntries is sent.
147         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
149
150         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
151
152         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155         assertEquals("Entries size", 1, appendEntries.getEntries().size());
156         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
159     }
160
161
162     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163         return sendReplicate(actorContext, 1, index);
164     }
165
166     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
167             final long index) {
168         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
169     }
170
171     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
172             final Payload payload) {
173         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
174         actorContext.getReplicatedLog().append(newEntry);
175         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
176     }
177
178     @Test
179     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
180         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
181
182         MockRaftActorContext actorContext = createActorContextWithFollower();
183
184         long term = 1;
185         actorContext.getTermInformation().update(term, "");
186
187         leader = new Leader(actorContext);
188
189         // Leader will send an immediate heartbeat - ignore it.
190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191
192         // The follower would normally reply - simulate that explicitly here.
193         long lastIndex = actorContext.getReplicatedLog().lastIndex();
194         leader.handleMessage(followerActor, new AppendEntriesReply(
195                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
196         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197
198         followerActor.underlyingActor().clear();
199
200         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
201
202         // State should not change
203         assertTrue(raftBehavior instanceof Leader);
204
205         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
206         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
207         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
208         assertEquals("Entries size", 1, appendEntries.getEntries().size());
209         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
210         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
211         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
212         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
213     }
214
215     @Test
216     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
217         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
218
219         MockRaftActorContext actorContext = createActorContextWithFollower();
220         actorContext.setCommitIndex(-1);
221         actorContext.setLastApplied(-1);
222
223         // The raft context is initialized with a couple log entries. However the commitIndex
224         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
225         // committed and applied. Now it regains leadership with a higher term (2).
226         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
227         long newTerm = prevTerm + 1;
228         actorContext.getTermInformation().update(newTerm, "");
229
230         leader = new Leader(actorContext);
231         actorContext.setCurrentBehavior(leader);
232
233         // Leader will send an immediate heartbeat - ignore it.
234         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
235
236         // The follower replies with the leader's current last index and term, simulating that it is
237         // up to date with the leader.
238         long lastIndex = actorContext.getReplicatedLog().lastIndex();
239         leader.handleMessage(followerActor, new AppendEntriesReply(
240                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
241
242         // The commit index should not get updated even though consensus was reached. This is b/c the
243         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
244         // from previous terms by counting replicas".
245         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
246
247         followerActor.underlyingActor().clear();
248
249         // Now replicate a new entry with the new term 2.
250         long newIndex = lastIndex + 1;
251         sendReplicate(actorContext, newTerm, newIndex);
252
253         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
254         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
255         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
256         assertEquals("Entries size", 1, appendEntries.getEntries().size());
257         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
258         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
259         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
260
261         // The follower replies with success. The leader should now update the commit index to the new index
262         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
263         // prior entries are committed indirectly".
264         leader.handleMessage(followerActor, new AppendEntriesReply(
265                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
266
267         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
268     }
269
270     @Test
271     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
272         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
273
274         MockRaftActorContext actorContext = createActorContextWithFollower();
275         actorContext.setRaftPolicy(createRaftPolicy(true, true));
276
277         long term = 1;
278         actorContext.getTermInformation().update(term, "");
279
280         leader = new Leader(actorContext);
281
282         // Leader will send an immediate heartbeat - ignore it.
283         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284
285         // The follower would normally reply - simulate that explicitly here.
286         long lastIndex = actorContext.getReplicatedLog().lastIndex();
287         leader.handleMessage(followerActor, new AppendEntriesReply(
288                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
289         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290
291         followerActor.underlyingActor().clear();
292
293         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
294
295         // State should not change
296         assertTrue(raftBehavior instanceof Leader);
297
298         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
299         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
300         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
301         assertEquals("Entries size", 1, appendEntries.getEntries().size());
302         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
303         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
304         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
305         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
306     }
307
308     @Test
309     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
310         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
311
312         MockRaftActorContext actorContext = createActorContextWithFollower();
313         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
314             @Override
315             public FiniteDuration getHeartBeatInterval() {
316                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
317             }
318         });
319
320         long term = 1;
321         actorContext.getTermInformation().update(term, "");
322
323         leader = new Leader(actorContext);
324
325         // Leader will send an immediate heartbeat - ignore it.
326         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
327
328         // The follower would normally reply - simulate that explicitly here.
329         long lastIndex = actorContext.getReplicatedLog().lastIndex();
330         leader.handleMessage(followerActor, new AppendEntriesReply(
331                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
332         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
333
334         followerActor.underlyingActor().clear();
335
336         for (int i = 0; i < 5; i++) {
337             sendReplicate(actorContext, lastIndex + i + 1);
338         }
339
340         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341         // We expect only 1 message to be sent because of two reasons,
342         // - an append entries reply was not received
343         // - the heartbeat interval has not expired
344         // In this scenario if multiple messages are sent they would likely be duplicates
345         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
346     }
347
348     @Test
349     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
350         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
351
352         MockRaftActorContext actorContext = createActorContextWithFollower();
353         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
354             @Override
355             public FiniteDuration getHeartBeatInterval() {
356                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
357             }
358         });
359
360         long term = 1;
361         actorContext.getTermInformation().update(term, "");
362
363         leader = new Leader(actorContext);
364
365         // Leader will send an immediate heartbeat - ignore it.
366         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
367
368         // The follower would normally reply - simulate that explicitly here.
369         long lastIndex = actorContext.getReplicatedLog().lastIndex();
370         leader.handleMessage(followerActor, new AppendEntriesReply(
371                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
372         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
373
374         followerActor.underlyingActor().clear();
375
376         for (int i = 0; i < 3; i++) {
377             sendReplicate(actorContext, lastIndex + i + 1);
378             leader.handleMessage(followerActor, new AppendEntriesReply(
379                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
380         }
381
382         // We are expecting six messages here -- a request to replicate and a consensus-reached message
383         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
384         assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
385         for (int i = 0; i < 3; i++) {
386             assertRequestEntry(lastIndex, allMessages, i);
387             assertCommitEntry(lastIndex, allMessages, i);
388         }
389
390         // Now perform another commit, eliciting a request to persist
391         sendReplicate(actorContext, lastIndex + 3 + 1);
392         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393         // This elicits another message for request to replicate
394         assertEquals("The number of request entries collected", 7, allMessages.size());
395         assertRequestEntry(lastIndex, allMessages, 3);
396
397         sendReplicate(actorContext, lastIndex + 4 + 1);
398         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
399         assertEquals("The number of request entries collected", 7, allMessages.size());
400     }
401
402     private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
403             final int messageNr) {
404         final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
405         assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
406         assertEquals(ImmutableList.of(), commitReq.getEntries());
407     }
408
409     private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
410             final int messageNr) {
411         final AppendEntries req = allMessages.get(2 * messageNr);
412         assertEquals(lastIndex + messageNr, req.getLeaderCommit());
413
414         final List<ReplicatedLogEntry> entries = req.getEntries();
415         assertEquals(1, entries.size());
416         assertEquals(messageNr + 2, entries.get(0).getIndex());
417     }
418
419     @Test
420     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
421         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
422
423         MockRaftActorContext actorContext = createActorContextWithFollower();
424         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
425             @Override
426             public FiniteDuration getHeartBeatInterval() {
427                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
428             }
429         });
430
431         long term = 1;
432         actorContext.getTermInformation().update(term, "");
433
434         leader = new Leader(actorContext);
435
436         // Leader will send an immediate heartbeat - ignore it.
437         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
438
439         // The follower would normally reply - simulate that explicitly here.
440         long lastIndex = actorContext.getReplicatedLog().lastIndex();
441         leader.handleMessage(followerActor, new AppendEntriesReply(
442                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
443         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
444
445         followerActor.underlyingActor().clear();
446
447         sendReplicate(actorContext, lastIndex + 1);
448
449         // Wait slightly longer than heartbeat duration
450         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
451
452         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
453
454         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
455         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
456
457         assertEquals(1, allMessages.get(0).getEntries().size());
458         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
459         assertEquals(1, allMessages.get(1).getEntries().size());
460         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
461
462     }
463
464     @Test
465     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
466         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
467
468         MockRaftActorContext actorContext = createActorContextWithFollower();
469         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470             @Override
471             public FiniteDuration getHeartBeatInterval() {
472                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
473             }
474         });
475
476         long term = 1;
477         actorContext.getTermInformation().update(term, "");
478
479         leader = new Leader(actorContext);
480
481         // Leader will send an immediate heartbeat - ignore it.
482         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483
484         // The follower would normally reply - simulate that explicitly here.
485         long lastIndex = actorContext.getReplicatedLog().lastIndex();
486         leader.handleMessage(followerActor, new AppendEntriesReply(
487                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489
490         followerActor.underlyingActor().clear();
491
492         for (int i = 0; i < 3; i++) {
493             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
494             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
495         }
496
497         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
498         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
499     }
500
501     @Test
502     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
503         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
504
505         MockRaftActorContext actorContext = createActorContextWithFollower();
506         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
507             @Override
508             public FiniteDuration getHeartBeatInterval() {
509                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
510             }
511         });
512
513         long term = 1;
514         actorContext.getTermInformation().update(term, "");
515
516         leader = new Leader(actorContext);
517
518         // Leader will send an immediate heartbeat - ignore it.
519         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
520
521         // The follower would normally reply - simulate that explicitly here.
522         long lastIndex = actorContext.getReplicatedLog().lastIndex();
523         leader.handleMessage(followerActor, new AppendEntriesReply(
524                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
525         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
526
527         followerActor.underlyingActor().clear();
528
529         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
530         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
531         sendReplicate(actorContext, lastIndex + 1);
532
533         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
534         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
535
536         assertEquals(0, allMessages.get(0).getEntries().size());
537         assertEquals(1, allMessages.get(1).getEntries().size());
538     }
539
540
541     @Test
542     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
543         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
544
545         MockRaftActorContext actorContext = createActorContext();
546
547         leader = new Leader(actorContext);
548
549         actorContext.setLastApplied(0);
550
551         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
552         long term = actorContext.getTermInformation().getCurrentTerm();
553         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
554                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
555
556         actorContext.getReplicatedLog().append(newEntry);
557
558         final Identifier id = new MockIdentifier("state-id");
559         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
560                 new Replicate(leaderActor, id, newEntry, true));
561
562         // State should not change
563         assertTrue(raftBehavior instanceof Leader);
564
565         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
566
567         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
568         // one since lastApplied state is 0.
569         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
570                 leaderActor, ApplyState.class);
571         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
572
573         for (int i = 0; i <= newLogIndex - 1; i++) {
574             ApplyState applyState = applyStateList.get(i);
575             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
576             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
577         }
578
579         ApplyState last = applyStateList.get((int) newLogIndex - 1);
580         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
581         assertEquals("getIdentifier", id, last.getIdentifier());
582     }
583
584     @Test
585     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
586         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
587
588         final MockRaftActorContext actorContext = createActorContextWithFollower();
589
590         Map<String, String> leadersSnapshot = new HashMap<>();
591         leadersSnapshot.put("1", "A");
592         leadersSnapshot.put("2", "B");
593         leadersSnapshot.put("3", "C");
594
595         //clears leaders log
596         actorContext.getReplicatedLog().removeFrom(0);
597
598         final int commitIndex = 3;
599         final int snapshotIndex = 2;
600         final int snapshotTerm = 1;
601
602         // set the snapshot variables in replicatedlog
603         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
604         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
605         actorContext.setCommitIndex(commitIndex);
606         //set follower timeout to 2 mins, helps during debugging
607         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
608
609         leader = new Leader(actorContext);
610
611         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
612         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
613
614         //update follower timestamp
615         leader.markFollowerActive(FOLLOWER_ID);
616
617         ByteString bs = toByteString(leadersSnapshot);
618         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
619                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
620                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
621         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
622                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
623         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
624         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
625
626         //send first chunk and no InstallSnapshotReply received yet
627         fts.getNextChunk();
628         fts.incrementChunkIndex();
629
630         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
631                 TimeUnit.MILLISECONDS);
632
633         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
634
635         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
636
637         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
638
639         //InstallSnapshotReply received
640         fts.markSendStatus(true);
641
642         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
643
644         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
645
646         assertEquals(commitIndex, is.getLastIncludedIndex());
647     }
648
649     @Test
650     public void testSendAppendEntriesSnapshotScenario() {
651         logStart("testSendAppendEntriesSnapshotScenario");
652
653         final MockRaftActorContext actorContext = createActorContextWithFollower();
654
655         Map<String, String> leadersSnapshot = new HashMap<>();
656         leadersSnapshot.put("1", "A");
657         leadersSnapshot.put("2", "B");
658         leadersSnapshot.put("3", "C");
659
660         //clears leaders log
661         actorContext.getReplicatedLog().removeFrom(0);
662
663         final int followersLastIndex = 2;
664         final int snapshotIndex = 3;
665         final int newEntryIndex = 4;
666         final int snapshotTerm = 1;
667         final int currentTerm = 2;
668
669         // set the snapshot variables in replicatedlog
670         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
671         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
672         actorContext.setCommitIndex(followersLastIndex);
673
674         leader = new Leader(actorContext);
675
676         // Leader will send an immediate heartbeat - ignore it.
677         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
678
679         // new entry
680         SimpleReplicatedLogEntry entry =
681                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
682                         new MockRaftActorContext.MockPayload("D"));
683
684         actorContext.getReplicatedLog().append(entry);
685
686         //update follower timestamp
687         leader.markFollowerActive(FOLLOWER_ID);
688
689         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
690         RaftActorBehavior raftBehavior = leader.handleMessage(
691                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
692
693         assertTrue(raftBehavior instanceof Leader);
694
695         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
696     }
697
698     @Test
699     public void testInitiateInstallSnapshot() {
700         logStart("testInitiateInstallSnapshot");
701
702         MockRaftActorContext actorContext = createActorContextWithFollower();
703
704         //clears leaders log
705         actorContext.getReplicatedLog().removeFrom(0);
706
707         final int followersLastIndex = 2;
708         final int snapshotIndex = 3;
709         final int newEntryIndex = 4;
710         final int snapshotTerm = 1;
711         final int currentTerm = 2;
712
713         // set the snapshot variables in replicatedlog
714         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
715         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
716         actorContext.setLastApplied(3);
717         actorContext.setCommitIndex(followersLastIndex);
718
719         leader = new Leader(actorContext);
720
721         // Leader will send an immediate heartbeat - ignore it.
722         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
723
724         // set the snapshot as absent and check if capture-snapshot is invoked.
725         leader.setSnapshotHolder(null);
726
727         // new entry
728         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
729                 new MockRaftActorContext.MockPayload("D"));
730
731         actorContext.getReplicatedLog().append(entry);
732
733         //update follower timestamp
734         leader.markFollowerActive(FOLLOWER_ID);
735
736         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
737
738         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
739
740         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
741
742         assertEquals(3, cs.getLastAppliedIndex());
743         assertEquals(1, cs.getLastAppliedTerm());
744         assertEquals(4, cs.getLastIndex());
745         assertEquals(2, cs.getLastTerm());
746
747         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
748         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
749
750         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
751     }
752
753     @Test
754     public void testInitiateForceInstallSnapshot() throws Exception {
755         logStart("testInitiateForceInstallSnapshot");
756
757         MockRaftActorContext actorContext = createActorContextWithFollower();
758
759         final int followersLastIndex = 2;
760         final int snapshotIndex = -1;
761         final int newEntryIndex = 4;
762         final int snapshotTerm = -1;
763         final int currentTerm = 2;
764
765         // set the snapshot variables in replicatedlog
766         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
767         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
768         actorContext.setLastApplied(3);
769         actorContext.setCommitIndex(followersLastIndex);
770
771         actorContext.getReplicatedLog().removeFrom(0);
772
773         AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
774         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
775
776         leader = new Leader(actorContext);
777         actorContext.setCurrentBehavior(leader);
778
779         // Leader will send an immediate heartbeat - ignore it.
780         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
781
782         // set the snapshot as absent and check if capture-snapshot is invoked.
783         leader.setSnapshotHolder(null);
784
785         for (int i = 0; i < 4; i++) {
786             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
787                     new MockRaftActorContext.MockPayload("X" + i)));
788         }
789
790         // new entry
791         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
792                 new MockRaftActorContext.MockPayload("D"));
793
794         actorContext.getReplicatedLog().append(entry);
795
796         //update follower timestamp
797         leader.markFollowerActive(FOLLOWER_ID);
798
799         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
800         // installed with a SendInstallSnapshot
801         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
802                 RaftVersions.CURRENT_VERSION));
803
804         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
805
806         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
807         assertEquals(3, cs.getLastAppliedIndex());
808         assertEquals(1, cs.getLastAppliedTerm());
809         assertEquals(4, cs.getLastIndex());
810         assertEquals(2, cs.getLastTerm());
811
812         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
813         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
814
815         MessageCollectorActor.clearMessages(followerActor);
816
817         // Sending Replicate message should not initiate another capture since the first is in progress.
818         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
819         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
820
821         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
822         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
823                 RaftVersions.CURRENT_VERSION));
824         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
825
826         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
827         final byte[] bytes = new byte[]{1, 2, 3};
828         installSnapshotStream.get().get().write(bytes);
829         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
830                 Runtime.getRuntime().totalMemory());
831         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
832
833         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
834         MessageCollectorActor.clearMessages(followerActor);
835         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
836                 RaftVersions.CURRENT_VERSION));
837         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
838     }
839
840
841     @Test
842     public void testInstallSnapshot() {
843         logStart("testInstallSnapshot");
844
845         final MockRaftActorContext actorContext = createActorContextWithFollower();
846
847         Map<String, String> leadersSnapshot = new HashMap<>();
848         leadersSnapshot.put("1", "A");
849         leadersSnapshot.put("2", "B");
850         leadersSnapshot.put("3", "C");
851
852         //clears leaders log
853         actorContext.getReplicatedLog().removeFrom(0);
854
855         final int lastAppliedIndex = 3;
856         final int snapshotIndex = 2;
857         final int snapshotTerm = 1;
858         final int currentTerm = 2;
859
860         // set the snapshot variables in replicatedlog
861         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
862         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
863         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
864         actorContext.setCommitIndex(lastAppliedIndex);
865         actorContext.setLastApplied(lastAppliedIndex);
866
867         leader = new Leader(actorContext);
868
869         // Initial heartbeat.
870         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
871
872         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
873         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
874
875         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
876         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
877                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
878
879         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
880                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
881
882         assertTrue(raftBehavior instanceof Leader);
883
884         // check if installsnapshot gets called with the correct values.
885
886         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
887                 InstallSnapshot.class);
888
889         assertNotNull(installSnapshot.getData());
890         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
891         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
892
893         assertEquals(currentTerm, installSnapshot.getTerm());
894     }
895
896     @Test
897     public void testForceInstallSnapshot() {
898         logStart("testForceInstallSnapshot");
899
900         final MockRaftActorContext actorContext = createActorContextWithFollower();
901
902         Map<String, String> leadersSnapshot = new HashMap<>();
903         leadersSnapshot.put("1", "A");
904         leadersSnapshot.put("2", "B");
905         leadersSnapshot.put("3", "C");
906
907         final int lastAppliedIndex = 3;
908         final int snapshotIndex = -1;
909         final int snapshotTerm = -1;
910         final int currentTerm = 2;
911
912         // set the snapshot variables in replicatedlog
913         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916         actorContext.setCommitIndex(lastAppliedIndex);
917         actorContext.setLastApplied(lastAppliedIndex);
918
919         leader = new Leader(actorContext);
920
921         // Initial heartbeat.
922         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
923
924         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
925         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
926
927         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
928         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
929                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
930
931         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
932                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
933
934         assertTrue(raftBehavior instanceof Leader);
935
936         // check if installsnapshot gets called with the correct values.
937
938         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
939                 InstallSnapshot.class);
940
941         assertNotNull(installSnapshot.getData());
942         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
943         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
944
945         assertEquals(currentTerm, installSnapshot.getTerm());
946     }
947
948     @Test
949     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
950         logStart("testHandleInstallSnapshotReplyLastChunk");
951
952         MockRaftActorContext actorContext = createActorContextWithFollower();
953
954         final int commitIndex = 3;
955         final int snapshotIndex = 2;
956         final int snapshotTerm = 1;
957         final int currentTerm = 2;
958
959         actorContext.setCommitIndex(commitIndex);
960
961         leader = new Leader(actorContext);
962         actorContext.setCurrentBehavior(leader);
963
964         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
965         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
966
967         // Ignore initial heartbeat.
968         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
969
970         Map<String, String> leadersSnapshot = new HashMap<>();
971         leadersSnapshot.put("1", "A");
972         leadersSnapshot.put("2", "B");
973         leadersSnapshot.put("3", "C");
974
975         // set the snapshot variables in replicatedlog
976
977         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
978         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
979         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
980
981         ByteString bs = toByteString(leadersSnapshot);
982         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
983                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
984                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
985         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
986                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
987         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
988         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
989         while (!fts.isLastChunk(fts.getChunkIndex())) {
990             fts.getNextChunk();
991             fts.incrementChunkIndex();
992         }
993
994         //clears leaders log
995         actorContext.getReplicatedLog().removeFrom(0);
996
997         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
998                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
999
1000         assertTrue(raftBehavior instanceof Leader);
1001
1002         assertEquals(1, leader.followerLogSize());
1003         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
1004         assertNotNull(fli);
1005         assertNull(fli.getInstallSnapshotState());
1006         assertEquals(commitIndex, fli.getMatchIndex());
1007         assertEquals(commitIndex + 1, fli.getNextIndex());
1008         assertFalse(leader.hasSnapshot());
1009     }
1010
1011     @Test
1012     public void testSendSnapshotfromInstallSnapshotReply() {
1013         logStart("testSendSnapshotfromInstallSnapshotReply");
1014
1015         MockRaftActorContext actorContext = createActorContextWithFollower();
1016
1017         final int commitIndex = 3;
1018         final int snapshotIndex = 2;
1019         final int snapshotTerm = 1;
1020         final int currentTerm = 2;
1021
1022         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1023             @Override
1024             public int getSnapshotChunkSize() {
1025                 return 50;
1026             }
1027         };
1028         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1029         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1030
1031         actorContext.setConfigParams(configParams);
1032         actorContext.setCommitIndex(commitIndex);
1033
1034         leader = new Leader(actorContext);
1035         actorContext.setCurrentBehavior(leader);
1036
1037         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1038         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1039
1040         Map<String, String> leadersSnapshot = new HashMap<>();
1041         leadersSnapshot.put("1", "A");
1042         leadersSnapshot.put("2", "B");
1043         leadersSnapshot.put("3", "C");
1044
1045         // set the snapshot variables in replicatedlog
1046         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1047         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1048         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1049
1050         ByteString bs = toByteString(leadersSnapshot);
1051         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1052                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1053                 -1, null, null);
1054
1055         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1056
1057         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1058                 InstallSnapshot.class);
1059
1060         assertEquals(1, installSnapshot.getChunkIndex());
1061         assertEquals(3, installSnapshot.getTotalChunks());
1062
1063         followerActor.underlyingActor().clear();
1064         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1065                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1066
1067         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1068
1069         assertEquals(2, installSnapshot.getChunkIndex());
1070         assertEquals(3, installSnapshot.getTotalChunks());
1071
1072         followerActor.underlyingActor().clear();
1073         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1074                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1075
1076         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1077
1078         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1079         followerActor.underlyingActor().clear();
1080         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1081                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1082
1083         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1084
1085         assertNull(installSnapshot);
1086     }
1087
1088
1089     @Test
1090     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1091         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1092
1093         MockRaftActorContext actorContext = createActorContextWithFollower();
1094
1095         final int commitIndex = 3;
1096         final int snapshotIndex = 2;
1097         final int snapshotTerm = 1;
1098         final int currentTerm = 2;
1099
1100         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1101             @Override
1102             public int getSnapshotChunkSize() {
1103                 return 50;
1104             }
1105         });
1106
1107         actorContext.setCommitIndex(commitIndex);
1108
1109         leader = new Leader(actorContext);
1110
1111         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1112         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1113
1114         Map<String, String> leadersSnapshot = new HashMap<>();
1115         leadersSnapshot.put("1", "A");
1116         leadersSnapshot.put("2", "B");
1117         leadersSnapshot.put("3", "C");
1118
1119         // set the snapshot variables in replicatedlog
1120         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1121         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1122         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1123
1124         ByteString bs = toByteString(leadersSnapshot);
1125         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1126                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1127                 -1, null, null);
1128
1129         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1130         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1131
1132         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1133                 InstallSnapshot.class);
1134
1135         assertEquals(1, installSnapshot.getChunkIndex());
1136         assertEquals(3, installSnapshot.getTotalChunks());
1137
1138         followerActor.underlyingActor().clear();
1139
1140         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1141                 FOLLOWER_ID, -1, false));
1142
1143         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1144                 TimeUnit.MILLISECONDS);
1145
1146         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1147
1148         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1149
1150         assertEquals(1, installSnapshot.getChunkIndex());
1151         assertEquals(3, installSnapshot.getTotalChunks());
1152     }
1153
1154     @Test
1155     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1156         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1157
1158         MockRaftActorContext actorContext = createActorContextWithFollower();
1159
1160         final int commitIndex = 3;
1161         final int snapshotIndex = 2;
1162         final int snapshotTerm = 1;
1163         final int currentTerm = 2;
1164
1165         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1166             @Override
1167             public int getSnapshotChunkSize() {
1168                 return 50;
1169             }
1170         });
1171
1172         actorContext.setCommitIndex(commitIndex);
1173
1174         leader = new Leader(actorContext);
1175
1176         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1177         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1178
1179         Map<String, String> leadersSnapshot = new HashMap<>();
1180         leadersSnapshot.put("1", "A");
1181         leadersSnapshot.put("2", "B");
1182         leadersSnapshot.put("3", "C");
1183
1184         // set the snapshot variables in replicatedlog
1185         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1186         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1187         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1188
1189         ByteString bs = toByteString(leadersSnapshot);
1190         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1191                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1192                 -1, null, null);
1193
1194         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1195
1196         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1197                 InstallSnapshot.class);
1198
1199         assertEquals(1, installSnapshot.getChunkIndex());
1200         assertEquals(3, installSnapshot.getTotalChunks());
1201         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1202                 installSnapshot.getLastChunkHashCode().getAsInt());
1203
1204         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1205
1206         followerActor.underlyingActor().clear();
1207
1208         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1209                 FOLLOWER_ID, 1, true));
1210
1211         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1212
1213         assertEquals(2, installSnapshot.getChunkIndex());
1214         assertEquals(3, installSnapshot.getTotalChunks());
1215         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1216     }
1217
1218     @Test
1219     public void testLeaderInstallSnapshotState() throws IOException {
1220         logStart("testLeaderInstallSnapshotState");
1221
1222         Map<String, String> leadersSnapshot = new HashMap<>();
1223         leadersSnapshot.put("1", "A");
1224         leadersSnapshot.put("2", "B");
1225         leadersSnapshot.put("3", "C");
1226
1227         ByteString bs = toByteString(leadersSnapshot);
1228         byte[] barray = bs.toByteArray();
1229
1230         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1231         fts.setSnapshotBytes(ByteSource.wrap(barray));
1232
1233         assertEquals(bs.size(), barray.length);
1234
1235         int chunkIndex = 0;
1236         for (int i = 0; i < barray.length; i = i + 50) {
1237             int length = i + 50;
1238             chunkIndex++;
1239
1240             if (i + 50 > barray.length) {
1241                 length = barray.length;
1242             }
1243
1244             byte[] chunk = fts.getNextChunk();
1245             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1246             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1247
1248             fts.markSendStatus(true);
1249             if (!fts.isLastChunk(chunkIndex)) {
1250                 fts.incrementChunkIndex();
1251             }
1252         }
1253
1254         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1255         fts.close();
1256     }
1257
1258     @Override
1259     protected Leader createBehavior(final RaftActorContext actorContext) {
1260         return new Leader(actorContext);
1261     }
1262
1263     @Override
1264     protected MockRaftActorContext createActorContext() {
1265         return createActorContext(leaderActor);
1266     }
1267
1268     @Override
1269     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1270         return createActorContext(LEADER_ID, actorRef);
1271     }
1272
1273     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1274         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1275         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1276         configParams.setElectionTimeoutFactor(100000);
1277         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1278         context.setConfigParams(configParams);
1279         context.setPayloadVersion(payloadVersion);
1280         return context;
1281     }
1282
1283     private MockRaftActorContext createActorContextWithFollower() {
1284         MockRaftActorContext actorContext = createActorContext();
1285         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1286                 followerActor.path().toString()).build());
1287         return actorContext;
1288     }
1289
1290     private MockRaftActorContext createFollowerActorContextWithLeader() {
1291         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1292         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1293         followerConfig.setElectionTimeoutFactor(10000);
1294         followerActorContext.setConfigParams(followerConfig);
1295         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296         return followerActorContext;
1297     }
1298
1299     @Test
1300     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1301         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1302
1303         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1304
1305         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1306
1307         Follower follower = new Follower(followerActorContext);
1308         followerActor.underlyingActor().setBehavior(follower);
1309         followerActorContext.setCurrentBehavior(follower);
1310
1311         Map<String, String> peerAddresses = new HashMap<>();
1312         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1313
1314         leaderActorContext.setPeerAddresses(peerAddresses);
1315
1316         leaderActorContext.getReplicatedLog().removeFrom(0);
1317
1318         //create 3 entries
1319         leaderActorContext.setReplicatedLog(
1320                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1321
1322         leaderActorContext.setCommitIndex(1);
1323
1324         followerActorContext.getReplicatedLog().removeFrom(0);
1325
1326         // follower too has the exact same log entries and has the same commit index
1327         followerActorContext.setReplicatedLog(
1328                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1329
1330         followerActorContext.setCommitIndex(1);
1331
1332         leader = new Leader(leaderActorContext);
1333         leaderActorContext.setCurrentBehavior(leader);
1334
1335         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1336
1337         assertEquals(-1, appendEntries.getLeaderCommit());
1338         assertEquals(0, appendEntries.getEntries().size());
1339         assertEquals(0, appendEntries.getPrevLogIndex());
1340
1341         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1342                 leaderActor, AppendEntriesReply.class);
1343
1344         assertEquals(2, appendEntriesReply.getLogLastIndex());
1345         assertEquals(1, appendEntriesReply.getLogLastTerm());
1346
1347         // follower returns its next index
1348         assertEquals(2, appendEntriesReply.getLogLastIndex());
1349         assertEquals(1, appendEntriesReply.getLogLastTerm());
1350
1351         follower.close();
1352     }
1353
1354     @Test
1355     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1356         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1357
1358         final MockRaftActorContext leaderActorContext = createActorContext();
1359
1360         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1361         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1362
1363         Follower follower = new Follower(followerActorContext);
1364         followerActor.underlyingActor().setBehavior(follower);
1365         followerActorContext.setCurrentBehavior(follower);
1366
1367         Map<String, String> leaderPeerAddresses = new HashMap<>();
1368         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1369
1370         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1371
1372         leaderActorContext.getReplicatedLog().removeFrom(0);
1373
1374         leaderActorContext.setReplicatedLog(
1375                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1376
1377         leaderActorContext.setCommitIndex(1);
1378
1379         followerActorContext.getReplicatedLog().removeFrom(0);
1380
1381         followerActorContext.setReplicatedLog(
1382                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1383
1384         // follower has the same log entries but its commit index > leaders commit index
1385         followerActorContext.setCommitIndex(2);
1386
1387         leader = new Leader(leaderActorContext);
1388
1389         // Initial heartbeat
1390         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1391
1392         assertEquals(-1, appendEntries.getLeaderCommit());
1393         assertEquals(0, appendEntries.getEntries().size());
1394         assertEquals(0, appendEntries.getPrevLogIndex());
1395
1396         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1397                 leaderActor, AppendEntriesReply.class);
1398
1399         assertEquals(2, appendEntriesReply.getLogLastIndex());
1400         assertEquals(1, appendEntriesReply.getLogLastTerm());
1401
1402         leaderActor.underlyingActor().setBehavior(follower);
1403         leader.handleMessage(followerActor, appendEntriesReply);
1404
1405         leaderActor.underlyingActor().clear();
1406         followerActor.underlyingActor().clear();
1407
1408         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1409                 TimeUnit.MILLISECONDS);
1410
1411         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1412
1413         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1414
1415         assertEquals(2, appendEntries.getLeaderCommit());
1416         assertEquals(0, appendEntries.getEntries().size());
1417         assertEquals(2, appendEntries.getPrevLogIndex());
1418
1419         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1420
1421         assertEquals(2, appendEntriesReply.getLogLastIndex());
1422         assertEquals(1, appendEntriesReply.getLogLastTerm());
1423
1424         assertEquals(2, followerActorContext.getCommitIndex());
1425
1426         follower.close();
1427     }
1428
1429     @Test
1430     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1431         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1432
1433         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1434         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1435                 new FiniteDuration(1000, TimeUnit.SECONDS));
1436
1437         leaderActorContext.setReplicatedLog(
1438                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1439         long leaderCommitIndex = 2;
1440         leaderActorContext.setCommitIndex(leaderCommitIndex);
1441         leaderActorContext.setLastApplied(leaderCommitIndex);
1442
1443         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1444         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1445
1446         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1447
1448         followerActorContext.setReplicatedLog(
1449                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1450         followerActorContext.setCommitIndex(0);
1451         followerActorContext.setLastApplied(0);
1452
1453         Follower follower = new Follower(followerActorContext);
1454         followerActor.underlyingActor().setBehavior(follower);
1455
1456         leader = new Leader(leaderActorContext);
1457
1458         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1459         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1460                 AppendEntriesReply.class);
1461
1462         MessageCollectorActor.clearMessages(followerActor);
1463         MessageCollectorActor.clearMessages(leaderActor);
1464
1465         // Verify initial AppendEntries sent.
1466         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1467         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1468         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1469
1470         leaderActor.underlyingActor().setBehavior(leader);
1471
1472         leader.handleMessage(followerActor, appendEntriesReply);
1473
1474         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1475         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1476
1477         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1478         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1480
1481         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1482         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1483                 appendEntries.getEntries().get(0).getData());
1484         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1485         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1486                 appendEntries.getEntries().get(1).getData());
1487
1488         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1489         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1490
1491         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1492
1493         ApplyState applyState = applyStateList.get(0);
1494         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1495         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1496         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1497                 applyState.getReplicatedLogEntry().getData());
1498
1499         applyState = applyStateList.get(1);
1500         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1501         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1502         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1503                 applyState.getReplicatedLogEntry().getData());
1504
1505         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1506         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1507     }
1508
1509     @Test
1510     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1511         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1512
1513         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1514         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1515                 new FiniteDuration(1000, TimeUnit.SECONDS));
1516
1517         leaderActorContext.setReplicatedLog(
1518                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1519         long leaderCommitIndex = 1;
1520         leaderActorContext.setCommitIndex(leaderCommitIndex);
1521         leaderActorContext.setLastApplied(leaderCommitIndex);
1522
1523         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1524         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1525
1526         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1527
1528         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1529         followerActorContext.setCommitIndex(-1);
1530         followerActorContext.setLastApplied(-1);
1531
1532         Follower follower = new Follower(followerActorContext);
1533         followerActor.underlyingActor().setBehavior(follower);
1534         followerActorContext.setCurrentBehavior(follower);
1535
1536         leader = new Leader(leaderActorContext);
1537
1538         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1539         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1540                 AppendEntriesReply.class);
1541
1542         MessageCollectorActor.clearMessages(followerActor);
1543         MessageCollectorActor.clearMessages(leaderActor);
1544
1545         // Verify initial AppendEntries sent with the leader's current commit index.
1546         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1547         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1548         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1549
1550         leaderActor.underlyingActor().setBehavior(leader);
1551         leaderActorContext.setCurrentBehavior(leader);
1552
1553         leader.handleMessage(followerActor, appendEntriesReply);
1554
1555         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1556         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1557
1558         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1559         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1560         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1561
1562         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1563         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1564                 appendEntries.getEntries().get(0).getData());
1565         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1566         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1567                 appendEntries.getEntries().get(1).getData());
1568
1569         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1570         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1571
1572         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1573
1574         ApplyState applyState = applyStateList.get(0);
1575         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1576         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1577         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1578                 applyState.getReplicatedLogEntry().getData());
1579
1580         applyState = applyStateList.get(1);
1581         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1582         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1583         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1584                 applyState.getReplicatedLogEntry().getData());
1585
1586         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1587         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1588     }
1589
1590     @Test
1591     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1592         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1593
1594         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1595         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1596                 new FiniteDuration(1000, TimeUnit.SECONDS));
1597
1598         leaderActorContext.setReplicatedLog(
1599                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1600         long leaderCommitIndex = 1;
1601         leaderActorContext.setCommitIndex(leaderCommitIndex);
1602         leaderActorContext.setLastApplied(leaderCommitIndex);
1603
1604         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1605         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1606
1607         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1608
1609         followerActorContext.setReplicatedLog(
1610                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1611         followerActorContext.setCommitIndex(-1);
1612         followerActorContext.setLastApplied(-1);
1613
1614         Follower follower = new Follower(followerActorContext);
1615         followerActor.underlyingActor().setBehavior(follower);
1616         followerActorContext.setCurrentBehavior(follower);
1617
1618         leader = new Leader(leaderActorContext);
1619
1620         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1621         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1622                 AppendEntriesReply.class);
1623
1624         MessageCollectorActor.clearMessages(followerActor);
1625         MessageCollectorActor.clearMessages(leaderActor);
1626
1627         // Verify initial AppendEntries sent with the leader's current commit index.
1628         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1629         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1630         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1631
1632         leaderActor.underlyingActor().setBehavior(leader);
1633         leaderActorContext.setCurrentBehavior(leader);
1634
1635         leader.handleMessage(followerActor, appendEntriesReply);
1636
1637         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1638         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1639
1640         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1641         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1642         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1643
1644         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1645         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1646         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1647                 appendEntries.getEntries().get(0).getData());
1648         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1649         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1650         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1651                 appendEntries.getEntries().get(1).getData());
1652
1653         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1654         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1655
1656         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1657
1658         ApplyState applyState = applyStateList.get(0);
1659         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1660         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1661         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1662                 applyState.getReplicatedLogEntry().getData());
1663
1664         applyState = applyStateList.get(1);
1665         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1666         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1667         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1668                 applyState.getReplicatedLogEntry().getData());
1669
1670         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1671         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1672         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1673     }
1674
1675     @Test
1676     public void testHandleAppendEntriesReplyWithNewerTerm() {
1677         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1678
1679         MockRaftActorContext leaderActorContext = createActorContext();
1680         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1681                 new FiniteDuration(10000, TimeUnit.SECONDS));
1682
1683         leaderActorContext.setReplicatedLog(
1684                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1685
1686         leader = new Leader(leaderActorContext);
1687         leaderActor.underlyingActor().setBehavior(leader);
1688         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1689
1690         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1691                 AppendEntriesReply.class);
1692
1693         assertEquals(false, appendEntriesReply.isSuccess());
1694         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1695
1696         MessageCollectorActor.clearMessages(leaderActor);
1697     }
1698
1699     @Test
1700     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1701         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1702
1703         MockRaftActorContext leaderActorContext = createActorContext();
1704         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1705                 new FiniteDuration(10000, TimeUnit.SECONDS));
1706
1707         leaderActorContext.setReplicatedLog(
1708                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1709         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1710
1711         leader = new Leader(leaderActorContext);
1712         leaderActor.underlyingActor().setBehavior(leader);
1713         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1714
1715         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1716                 AppendEntriesReply.class);
1717
1718         assertEquals(false, appendEntriesReply.isSuccess());
1719         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1720
1721         MessageCollectorActor.clearMessages(leaderActor);
1722     }
1723
1724     @Test
1725     public void testHandleAppendEntriesReplySuccess() {
1726         logStart("testHandleAppendEntriesReplySuccess");
1727
1728         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1729
1730         leaderActorContext.setReplicatedLog(
1731                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1732
1733         leaderActorContext.setCommitIndex(1);
1734         leaderActorContext.setLastApplied(1);
1735         leaderActorContext.getTermInformation().update(1, "leader");
1736
1737         leader = new Leader(leaderActorContext);
1738
1739         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1740
1741         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1742         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1743
1744         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1745
1746         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1747
1748         assertEquals(RaftState.Leader, raftActorBehavior.state());
1749
1750         assertEquals(2, leaderActorContext.getCommitIndex());
1751
1752         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1753                 leaderActor, ApplyJournalEntries.class);
1754
1755         assertEquals(2, leaderActorContext.getLastApplied());
1756
1757         assertEquals(2, applyJournalEntries.getToIndex());
1758
1759         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1760                 ApplyState.class);
1761
1762         assertEquals(1,applyStateList.size());
1763
1764         ApplyState applyState = applyStateList.get(0);
1765
1766         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1767
1768         assertEquals(2, followerInfo.getMatchIndex());
1769         assertEquals(3, followerInfo.getNextIndex());
1770         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1771         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1772     }
1773
1774     @Test
1775     public void testHandleAppendEntriesReplyUnknownFollower() {
1776         logStart("testHandleAppendEntriesReplyUnknownFollower");
1777
1778         MockRaftActorContext leaderActorContext = createActorContext();
1779
1780         leader = new Leader(leaderActorContext);
1781
1782         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1783
1784         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1785
1786         assertEquals(RaftState.Leader, raftActorBehavior.state());
1787     }
1788
1789     @Test
1790     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1791         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1792
1793         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1794         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1795                 new FiniteDuration(1000, TimeUnit.SECONDS));
1796         // Note: the size here depends on estimate
1797         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
1798
1799         leaderActorContext.setReplicatedLog(
1800                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1801         long leaderCommitIndex = 3;
1802         leaderActorContext.setCommitIndex(leaderCommitIndex);
1803         leaderActorContext.setLastApplied(leaderCommitIndex);
1804
1805         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1806         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1807         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1808         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1809
1810         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1811
1812         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1813         followerActorContext.setCommitIndex(-1);
1814         followerActorContext.setLastApplied(-1);
1815
1816         Follower follower = new Follower(followerActorContext);
1817         followerActor.underlyingActor().setBehavior(follower);
1818         followerActorContext.setCurrentBehavior(follower);
1819
1820         leader = new Leader(leaderActorContext);
1821
1822         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1823         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1824                 AppendEntriesReply.class);
1825
1826         MessageCollectorActor.clearMessages(followerActor);
1827         MessageCollectorActor.clearMessages(leaderActor);
1828
1829         // Verify initial AppendEntries sent with the leader's current commit index.
1830         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1831         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1832         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1833
1834         leaderActor.underlyingActor().setBehavior(leader);
1835         leaderActorContext.setCurrentBehavior(leader);
1836
1837         leader.handleMessage(followerActor, appendEntriesReply);
1838
1839         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1840                 AppendEntries.class, 2);
1841         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1842
1843         appendEntries = appendEntriesList.get(0);
1844         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1845         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1846         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1847
1848         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1849         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1850                 appendEntries.getEntries().get(0).getData());
1851         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1852         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1853                 appendEntries.getEntries().get(1).getData());
1854
1855         appendEntries = appendEntriesList.get(1);
1856         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1857         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1858         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1859
1860         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1861         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1862                 appendEntries.getEntries().get(0).getData());
1863         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1864         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1865                 appendEntries.getEntries().get(1).getData());
1866
1867         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1868         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1869
1870         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1871
1872         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1873         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1874     }
1875
1876     @Test
1877     public void testHandleRequestVoteReply() {
1878         logStart("testHandleRequestVoteReply");
1879
1880         MockRaftActorContext leaderActorContext = createActorContext();
1881
1882         leader = new Leader(leaderActorContext);
1883
1884         // Should be a no-op.
1885         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1886                 new RequestVoteReply(1, true));
1887
1888         assertEquals(RaftState.Leader, raftActorBehavior.state());
1889
1890         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1891
1892         assertEquals(RaftState.Leader, raftActorBehavior.state());
1893     }
1894
1895     @Test
1896     public void testIsolatedLeaderCheckNoFollowers() {
1897         logStart("testIsolatedLeaderCheckNoFollowers");
1898
1899         MockRaftActorContext leaderActorContext = createActorContext();
1900
1901         leader = new Leader(leaderActorContext);
1902         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1903         assertTrue(newBehavior instanceof Leader);
1904     }
1905
1906     @Test
1907     public void testIsolatedLeaderCheckNoVotingFollowers() {
1908         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1909
1910         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1911         Follower follower = new Follower(followerActorContext);
1912         followerActor.underlyingActor().setBehavior(follower);
1913
1914         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1915         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1916                 new FiniteDuration(1000, TimeUnit.SECONDS));
1917         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1918
1919         leader = new Leader(leaderActorContext);
1920         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1921         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1922         assertTrue("Expected Leader", newBehavior instanceof Leader);
1923     }
1924
1925     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1926         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1927         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1928
1929         MockRaftActorContext leaderActorContext = createActorContext();
1930
1931         Map<String, String> peerAddresses = new HashMap<>();
1932         peerAddresses.put("follower-1", followerActor1.path().toString());
1933         peerAddresses.put("follower-2", followerActor2.path().toString());
1934
1935         leaderActorContext.setPeerAddresses(peerAddresses);
1936         leaderActorContext.setRaftPolicy(raftPolicy);
1937
1938         leader = new Leader(leaderActorContext);
1939
1940         leader.markFollowerActive("follower-1");
1941         leader.markFollowerActive("follower-2");
1942         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1944
1945         // kill 1 follower and verify if that got killed
1946         final TestKit probe = new TestKit(getSystem());
1947         probe.watch(followerActor1);
1948         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1949         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1950         assertEquals(termMsg1.getActor(), followerActor1);
1951
1952         leader.markFollowerInActive("follower-1");
1953         leader.markFollowerActive("follower-2");
1954         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1955         assertTrue("Behavior not instance of Leader when majority of followers are active",
1956                 newBehavior instanceof Leader);
1957
1958         // kill 2nd follower and leader should change to Isolated leader
1959         followerActor2.tell(PoisonPill.getInstance(), null);
1960         probe.watch(followerActor2);
1961         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1962         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1963         assertEquals(termMsg2.getActor(), followerActor2);
1964
1965         leader.markFollowerInActive("follower-2");
1966         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1967     }
1968
1969     @Test
1970     public void testIsolatedLeaderCheckTwoFollowers() {
1971         logStart("testIsolatedLeaderCheckTwoFollowers");
1972
1973         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1974
1975         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1976             newBehavior instanceof IsolatedLeader);
1977     }
1978
1979     @Test
1980     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1981         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1982
1983         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1984
1985         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1986                 newBehavior instanceof Leader);
1987     }
1988
1989     @Test
1990     public void testLaggingFollowerStarvation() {
1991         logStart("testLaggingFollowerStarvation");
1992
1993         String leaderActorId = actorFactory.generateActorId("leader");
1994         String follower1ActorId = actorFactory.generateActorId("follower");
1995         String follower2ActorId = actorFactory.generateActorId("follower");
1996
1997         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1998         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1999
2000         MockRaftActorContext leaderActorContext =
2001                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
2002
2003         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
2004         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
2005         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
2006
2007         leaderActorContext.setConfigParams(configParams);
2008
2009         leaderActorContext.setReplicatedLog(
2010                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2011
2012         Map<String, String> peerAddresses = new HashMap<>();
2013         peerAddresses.put(follower1ActorId,
2014                 follower1Actor.path().toString());
2015         peerAddresses.put(follower2ActorId,
2016                 follower2Actor.path().toString());
2017
2018         leaderActorContext.setPeerAddresses(peerAddresses);
2019         leaderActorContext.getTermInformation().update(1, leaderActorId);
2020
2021         leader = createBehavior(leaderActorContext);
2022
2023         leaderActor.underlyingActor().setBehavior(leader);
2024
2025         for (int i = 1; i < 6; i++) {
2026             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2027             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2028                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2029             assertTrue(newBehavior == leader);
2030             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2031         }
2032
2033         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2034         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2035
2036         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2037                 heartbeats.size() > 1);
2038
2039         // Check if follower-2 got AppendEntries during this time and was not starved
2040         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2041
2042         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2043                 appendEntries.size() > 1);
2044     }
2045
2046     @Test
2047     public void testReplicationConsensusWithNonVotingFollower() {
2048         logStart("testReplicationConsensusWithNonVotingFollower");
2049
2050         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052                 new FiniteDuration(1000, TimeUnit.SECONDS));
2053
2054         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2055         leaderActorContext.setCommitIndex(-1);
2056         leaderActorContext.setLastApplied(-1);
2057
2058         String nonVotingFollowerId = "nonvoting-follower";
2059         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2060                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2061
2062         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2063                 VotingState.NON_VOTING);
2064
2065         leader = new Leader(leaderActorContext);
2066         leaderActorContext.setCurrentBehavior(leader);
2067
2068         // Ignore initial heartbeats
2069         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2071
2072         MessageCollectorActor.clearMessages(followerActor);
2073         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2074         MessageCollectorActor.clearMessages(leaderActor);
2075
2076         // Send a Replicate message and wait for AppendEntries.
2077         sendReplicate(leaderActorContext, 0);
2078
2079         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2080         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2081
2082         // Send reply only from the voting follower and verify consensus via ApplyState.
2083         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2084
2085         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2086
2087         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2088
2089         MessageCollectorActor.clearMessages(followerActor);
2090         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2091         MessageCollectorActor.clearMessages(leaderActor);
2092
2093         // Send another Replicate message
2094         sendReplicate(leaderActorContext, 1);
2095
2096         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2097         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2098                 AppendEntries.class);
2099         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2100         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2101
2102         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2103         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2104
2105         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2106
2107         // Send reply from the voting follower and verify consensus.
2108         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2109
2110         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2111     }
2112
2113     @Test
2114     public void testTransferLeadershipWithFollowerInSync() {
2115         logStart("testTransferLeadershipWithFollowerInSync");
2116
2117         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2118         leaderActorContext.setLastApplied(-1);
2119         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2120                 new FiniteDuration(1000, TimeUnit.SECONDS));
2121         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2122
2123         leader = new Leader(leaderActorContext);
2124         leaderActorContext.setCurrentBehavior(leader);
2125
2126         // Initial heartbeat
2127         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2128         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2129         MessageCollectorActor.clearMessages(followerActor);
2130
2131         sendReplicate(leaderActorContext, 0);
2132         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133
2134         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2135         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2136         MessageCollectorActor.clearMessages(followerActor);
2137
2138         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2139         leader.transferLeadership(mockTransferCohort);
2140
2141         verify(mockTransferCohort, never()).transferComplete();
2142         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2145
2146         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2147         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2148
2149         // Leader should force an election timeout
2150         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2151
2152         verify(mockTransferCohort).transferComplete();
2153     }
2154
2155     @Test
2156     public void testTransferLeadershipWithEmptyLog() {
2157         logStart("testTransferLeadershipWithEmptyLog");
2158
2159         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161                 new FiniteDuration(1000, TimeUnit.SECONDS));
2162         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2163
2164         leader = new Leader(leaderActorContext);
2165         leaderActorContext.setCurrentBehavior(leader);
2166
2167         // Initial heartbeat
2168         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2169         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2170         MessageCollectorActor.clearMessages(followerActor);
2171
2172         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2173         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2174         leader.transferLeadership(mockTransferCohort);
2175
2176         verify(mockTransferCohort, never()).transferComplete();
2177         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2179
2180         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2181         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2182
2183         // Leader should force an election timeout
2184         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2185
2186         verify(mockTransferCohort).transferComplete();
2187     }
2188
2189     @Test
2190     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2191         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2192
2193         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2194         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2195                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2196
2197         leader = new Leader(leaderActorContext);
2198         leaderActorContext.setCurrentBehavior(leader);
2199
2200         // Initial heartbeat
2201         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2202         MessageCollectorActor.clearMessages(followerActor);
2203
2204         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2205         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2206         leader.transferLeadership(mockTransferCohort);
2207
2208         verify(mockTransferCohort, never()).transferComplete();
2209
2210         // Sync up the follower.
2211         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2212         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2213         MessageCollectorActor.clearMessages(followerActor);
2214
2215         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2216                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2217         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2218         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2219         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2220
2221         // Leader should force an election timeout
2222         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2223
2224         verify(mockTransferCohort).transferComplete();
2225     }
2226
2227     @Test
2228     public void testTransferLeadershipWithFollowerSyncTimeout() {
2229         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2230
2231         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2232         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2233                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2234         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2235         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2236
2237         leader = new Leader(leaderActorContext);
2238         leaderActorContext.setCurrentBehavior(leader);
2239
2240         // Initial heartbeat
2241         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2242         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2243         MessageCollectorActor.clearMessages(followerActor);
2244
2245         sendReplicate(leaderActorContext, 0);
2246         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2247
2248         MessageCollectorActor.clearMessages(followerActor);
2249
2250         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2251         leader.transferLeadership(mockTransferCohort);
2252
2253         verify(mockTransferCohort, never()).transferComplete();
2254
2255         // Send heartbeats to time out the transfer.
2256         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2257             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2258                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2259             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2260         }
2261
2262         verify(mockTransferCohort).abortTransfer();
2263         verify(mockTransferCohort, never()).transferComplete();
2264         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2265     }
2266
2267     @Test
2268     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2269         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2270
2271         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2272                 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2273                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2274         final MockRaftActorContext.MockPayload largePayload =
2275                 new MockRaftActorContext.MockPayload("large", serializedSize);
2276
2277         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2278         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2279                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2280         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2281         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2282         leaderActorContext.setCommitIndex(-1);
2283         leaderActorContext.setLastApplied(-1);
2284
2285         leader = new Leader(leaderActorContext);
2286         leaderActorContext.setCurrentBehavior(leader);
2287
2288         // Send initial heartbeat reply so follower is marked active
2289         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2290         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2291         MessageCollectorActor.clearMessages(followerActor);
2292
2293         // Send normal payload first to prime commit index.
2294         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2295         sendReplicate(leaderActorContext, term, 0);
2296
2297         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2298         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2299         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2300
2301         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2302         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2303         MessageCollectorActor.clearMessages(followerActor);
2304
2305         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2306         sendReplicate(leaderActorContext, term, 1, largePayload);
2307
2308         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2309         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2310         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2311
2312         final Identifier slicingId = messageSlice.getIdentifier();
2313
2314         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2315         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2316         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2317         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2318         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2319         MessageCollectorActor.clearMessages(followerActor);
2320
2321         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2322
2323         // Sleep for the heartbeat interval so AppendEntries is sent.
2324         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2325                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2326
2327         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2328
2329         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2330         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2331         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2332         MessageCollectorActor.clearMessages(followerActor);
2333
2334         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2335
2336         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2337         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2338         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2339
2340         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2341
2342         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2343
2344         MessageCollectorActor.clearMessages(followerActor);
2345
2346         // Send another normal payload.
2347
2348         sendReplicate(leaderActorContext, term, 2);
2349
2350         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2351         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2352         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2353         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2354     }
2355
2356     @Test
2357     public void testLargePayloadSlicingExpiration() {
2358         logStart("testLargePayloadSlicingExpiration");
2359
2360         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2361         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2362                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2363         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2364         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2365         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2366         leaderActorContext.setCommitIndex(-1);
2367         leaderActorContext.setLastApplied(-1);
2368
2369         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2370         leader = new Leader(leaderActorContext);
2371         leaderActorContext.setCurrentBehavior(leader);
2372
2373         // Send initial heartbeat reply so follower is marked active
2374         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2375         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2376         MessageCollectorActor.clearMessages(followerActor);
2377
2378         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2379                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2380         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2381
2382         // Sleep for at least 3 * election timeout so the slicing state expires.
2383         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2384                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2385         MessageCollectorActor.clearMessages(followerActor);
2386
2387         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2388
2389         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2390         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2391         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2392
2393         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2394         MessageCollectorActor.clearMessages(followerActor);
2395
2396         // Send an AppendEntriesReply - this should restart the slicing.
2397
2398         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2399                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2400
2401         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2402
2403         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2404     }
2405
2406     @Test
2407     public void testLeaderAddressInAppendEntries() {
2408         logStart("testLeaderAddressInAppendEntries");
2409
2410         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2411         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2412                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2413         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2414         leaderActorContext.setCommitIndex(-1);
2415         leaderActorContext.setLastApplied(-1);
2416
2417         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2418             peerId -> leaderActor.path().toString());
2419
2420         leader = new Leader(leaderActorContext);
2421         leaderActorContext.setCurrentBehavior(leader);
2422
2423         // Initial heartbeat shouldn't have the leader address
2424
2425         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2426         assertFalse(appendEntries.getLeaderAddress().isPresent());
2427         MessageCollectorActor.clearMessages(followerActor);
2428
2429         // Send AppendEntriesReply indicating the follower needs the leader address
2430
2431         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2432                 RaftVersions.CURRENT_VERSION));
2433
2434         // Sleep for the heartbeat interval so AppendEntries is sent.
2435         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2436                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2437
2438         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2439
2440         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2441         assertTrue(appendEntries.getLeaderAddress().isPresent());
2442         assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2443         MessageCollectorActor.clearMessages(followerActor);
2444
2445         // Send AppendEntriesReply indicating the follower does not need the leader address
2446
2447         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2448                 RaftVersions.CURRENT_VERSION));
2449
2450         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2451                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2452
2453         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2454
2455         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2456         assertFalse(appendEntries.getLeaderAddress().isPresent());
2457     }
2458
2459     @Override
2460     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2461             final ActorRef actorRef, final RaftRPC rpc) {
2462         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2463         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2464     }
2465
2466     private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2467
2468         private final long electionTimeOutIntervalMillis;
2469         private final int snapshotChunkSize;
2470
2471         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2472             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2473             this.snapshotChunkSize = snapshotChunkSize;
2474         }
2475
2476         @Override
2477         public FiniteDuration getElectionTimeOutInterval() {
2478             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2479         }
2480
2481         @Override
2482         public int getSnapshotChunkSize() {
2483             return snapshotChunkSize;
2484         }
2485     }
2486 }