Send commitIndex updates to followers as soon as possible
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.protobuf.ByteString;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.io.ByteSource;
33 import com.google.common.util.concurrent.Uninterruptibles;
34 import java.io.IOException;
35 import java.io.OutputStream;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.apache.commons.lang3.SerializationUtils;
44 import org.junit.After;
45 import org.junit.Test;
46 import org.opendaylight.controller.cluster.messaging.MessageSlice;
47 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
48 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
49 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
50 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorContext;
52 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
53 import org.opendaylight.controller.cluster.raft.RaftState;
54 import org.opendaylight.controller.cluster.raft.RaftVersions;
55 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
56 import org.opendaylight.controller.cluster.raft.VotingState;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
58 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
60 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
62 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
66 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
68 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
69 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
70 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
71 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
76 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
77 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
78 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import org.opendaylight.yangtools.concepts.Identifier;
81 import scala.concurrent.duration.FiniteDuration;
82
83 public class LeaderTest extends AbstractLeaderTest<Leader> {
84
85     static final String FOLLOWER_ID = "follower";
86     public static final String LEADER_ID = "leader";
87
88     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
89             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
90
91     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
92             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
93
94     private Leader leader;
95     private final short payloadVersion = 5;
96
97     @Override
98     @After
99     public void tearDown() {
100         if (leader != null) {
101             leader.close();
102         }
103
104         super.tearDown();
105     }
106
107     @Test
108     public void testHandleMessageForUnknownMessage() {
109         logStart("testHandleMessageForUnknownMessage");
110
111         leader = new Leader(createActorContext());
112
113         // handle message should null when it receives an unknown message
114         assertNull(leader.handleMessage(followerActor, "foo"));
115     }
116
117     @Test
118     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
119         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
120
121         MockRaftActorContext actorContext = createActorContextWithFollower();
122         actorContext.setCommitIndex(-1);
123         actorContext.setPayloadVersion(payloadVersion);
124
125         long term = 1;
126         actorContext.getTermInformation().update(term, "");
127
128         leader = new Leader(actorContext);
129         actorContext.setCurrentBehavior(leader);
130
131         // Leader should send an immediate heartbeat with no entries as follower is inactive.
132         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
133         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
134         assertEquals("getTerm", term, appendEntries.getTerm());
135         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
136         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
137         assertEquals("Entries size", 0, appendEntries.getEntries().size());
138         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
139
140         // The follower would normally reply - simulate that explicitly here.
141         leader.handleMessage(followerActor, new AppendEntriesReply(
142                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
143         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
144
145         followerActor.underlyingActor().clear();
146
147         // Sleep for the heartbeat interval so AppendEntries is sent.
148         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
149                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
150
151         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
152
153         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
154         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
155         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
156         assertEquals("Entries size", 1, appendEntries.getEntries().size());
157         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
158         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
159         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
160     }
161
162
163     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
164         return sendReplicate(actorContext, 1, index);
165     }
166
167     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
168             final long index) {
169         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
170     }
171
172     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
173             final Payload payload) {
174         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
175         actorContext.getReplicatedLog().append(newEntry);
176         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
177     }
178
179     @Test
180     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
181         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
182
183         MockRaftActorContext actorContext = createActorContextWithFollower();
184
185         long term = 1;
186         actorContext.getTermInformation().update(term, "");
187
188         leader = new Leader(actorContext);
189
190         // Leader will send an immediate heartbeat - ignore it.
191         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
192
193         // The follower would normally reply - simulate that explicitly here.
194         long lastIndex = actorContext.getReplicatedLog().lastIndex();
195         leader.handleMessage(followerActor, new AppendEntriesReply(
196                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
197         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
198
199         followerActor.underlyingActor().clear();
200
201         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
202
203         // State should not change
204         assertTrue(raftBehavior instanceof Leader);
205
206         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
207         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
208         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
209         assertEquals("Entries size", 1, appendEntries.getEntries().size());
210         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
211         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
212         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
213         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
214     }
215
216     @Test
217     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
218         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
219
220         MockRaftActorContext actorContext = createActorContextWithFollower();
221         actorContext.setCommitIndex(-1);
222         actorContext.setLastApplied(-1);
223
224         // The raft context is initialized with a couple log entries. However the commitIndex
225         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
226         // committed and applied. Now it regains leadership with a higher term (2).
227         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
228         long newTerm = prevTerm + 1;
229         actorContext.getTermInformation().update(newTerm, "");
230
231         leader = new Leader(actorContext);
232         actorContext.setCurrentBehavior(leader);
233
234         // Leader will send an immediate heartbeat - ignore it.
235         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
236
237         // The follower replies with the leader's current last index and term, simulating that it is
238         // up to date with the leader.
239         long lastIndex = actorContext.getReplicatedLog().lastIndex();
240         leader.handleMessage(followerActor, new AppendEntriesReply(
241                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
242
243         // The commit index should not get updated even though consensus was reached. This is b/c the
244         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
245         // from previous terms by counting replicas".
246         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
247
248         followerActor.underlyingActor().clear();
249
250         // Now replicate a new entry with the new term 2.
251         long newIndex = lastIndex + 1;
252         sendReplicate(actorContext, newTerm, newIndex);
253
254         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
255         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
256         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
257         assertEquals("Entries size", 1, appendEntries.getEntries().size());
258         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
259         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
260         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
261
262         // The follower replies with success. The leader should now update the commit index to the new index
263         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
264         // prior entries are committed indirectly".
265         leader.handleMessage(followerActor, new AppendEntriesReply(
266                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
267
268         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
269     }
270
271     @Test
272     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
273         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
274
275         MockRaftActorContext actorContext = createActorContextWithFollower();
276         actorContext.setRaftPolicy(createRaftPolicy(true, true));
277
278         long term = 1;
279         actorContext.getTermInformation().update(term, "");
280
281         leader = new Leader(actorContext);
282
283         // Leader will send an immediate heartbeat - ignore it.
284         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285
286         // The follower would normally reply - simulate that explicitly here.
287         long lastIndex = actorContext.getReplicatedLog().lastIndex();
288         leader.handleMessage(followerActor, new AppendEntriesReply(
289                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
290         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
291
292         followerActor.underlyingActor().clear();
293
294         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
295
296         // State should not change
297         assertTrue(raftBehavior instanceof Leader);
298
299         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
300         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
301         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
302         assertEquals("Entries size", 1, appendEntries.getEntries().size());
303         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
304         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
305         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
306         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
307     }
308
309     @Test
310     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
311         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
312
313         MockRaftActorContext actorContext = createActorContextWithFollower();
314         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
315             @Override
316             public FiniteDuration getHeartBeatInterval() {
317                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
318             }
319         });
320
321         long term = 1;
322         actorContext.getTermInformation().update(term, "");
323
324         leader = new Leader(actorContext);
325
326         // Leader will send an immediate heartbeat - ignore it.
327         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
328
329         // The follower would normally reply - simulate that explicitly here.
330         long lastIndex = actorContext.getReplicatedLog().lastIndex();
331         leader.handleMessage(followerActor, new AppendEntriesReply(
332                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
333         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
334
335         followerActor.underlyingActor().clear();
336
337         for (int i = 0; i < 5; i++) {
338             sendReplicate(actorContext, lastIndex + i + 1);
339         }
340
341         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
342         // We expect only 1 message to be sent because of two reasons,
343         // - an append entries reply was not received
344         // - the heartbeat interval has not expired
345         // In this scenario if multiple messages are sent they would likely be duplicates
346         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
347     }
348
349     @Test
350     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
351         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
352
353         MockRaftActorContext actorContext = createActorContextWithFollower();
354         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
355             @Override
356             public FiniteDuration getHeartBeatInterval() {
357                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
358             }
359         });
360
361         long term = 1;
362         actorContext.getTermInformation().update(term, "");
363
364         leader = new Leader(actorContext);
365
366         // Leader will send an immediate heartbeat - ignore it.
367         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
368
369         // The follower would normally reply - simulate that explicitly here.
370         long lastIndex = actorContext.getReplicatedLog().lastIndex();
371         leader.handleMessage(followerActor, new AppendEntriesReply(
372                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
373         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
374
375         followerActor.underlyingActor().clear();
376
377         for (int i = 0; i < 3; i++) {
378             sendReplicate(actorContext, lastIndex + i + 1);
379             leader.handleMessage(followerActor, new AppendEntriesReply(
380                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
381         }
382
383         // We are expecting six messages here -- a request to replicate and a consensus-reached message
384         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
385         assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
386         for (int i = 0; i < 3; i++) {
387             assertRequestEntry(lastIndex, allMessages, i);
388             assertCommitEntry(lastIndex, allMessages, i);
389         }
390
391         // Now perform another commit, eliciting a request to persist
392         sendReplicate(actorContext, lastIndex + 3 + 1);
393         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
394         // This elicits another message for request to replicate
395         assertEquals("The number of request entries collected", 7, allMessages.size());
396         assertRequestEntry(lastIndex, allMessages, 3);
397
398         sendReplicate(actorContext, lastIndex + 4 + 1);
399         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
400         assertEquals("The number of request entries collected", 7, allMessages.size());
401     }
402
403     private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
404             final int messageNr) {
405         final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
406         assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
407         assertEquals(ImmutableList.of(), commitReq.getEntries());
408     }
409
410     private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
411             final int messageNr) {
412         final AppendEntries req = allMessages.get(2 * messageNr);
413         assertEquals(lastIndex + messageNr, req.getLeaderCommit());
414
415         final List<ReplicatedLogEntry> entries = req.getEntries();
416         assertEquals(1, entries.size());
417         assertEquals(messageNr + 2, entries.get(0).getIndex());
418     }
419
420     @Test
421     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
422         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
423
424         MockRaftActorContext actorContext = createActorContextWithFollower();
425         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
426             @Override
427             public FiniteDuration getHeartBeatInterval() {
428                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
429             }
430         });
431
432         long term = 1;
433         actorContext.getTermInformation().update(term, "");
434
435         leader = new Leader(actorContext);
436
437         // Leader will send an immediate heartbeat - ignore it.
438         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
439
440         // The follower would normally reply - simulate that explicitly here.
441         long lastIndex = actorContext.getReplicatedLog().lastIndex();
442         leader.handleMessage(followerActor, new AppendEntriesReply(
443                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
444         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
445
446         followerActor.underlyingActor().clear();
447
448         sendReplicate(actorContext, lastIndex + 1);
449
450         // Wait slightly longer than heartbeat duration
451         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
452
453         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
454
455         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
456         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
457
458         assertEquals(1, allMessages.get(0).getEntries().size());
459         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
460         assertEquals(1, allMessages.get(1).getEntries().size());
461         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
462
463     }
464
465     @Test
466     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
467         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
468
469         MockRaftActorContext actorContext = createActorContextWithFollower();
470         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
471             @Override
472             public FiniteDuration getHeartBeatInterval() {
473                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
474             }
475         });
476
477         long term = 1;
478         actorContext.getTermInformation().update(term, "");
479
480         leader = new Leader(actorContext);
481
482         // Leader will send an immediate heartbeat - ignore it.
483         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
484
485         // The follower would normally reply - simulate that explicitly here.
486         long lastIndex = actorContext.getReplicatedLog().lastIndex();
487         leader.handleMessage(followerActor, new AppendEntriesReply(
488                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
489         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
490
491         followerActor.underlyingActor().clear();
492
493         for (int i = 0; i < 3; i++) {
494             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
495             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
496         }
497
498         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
499         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
500     }
501
502     @Test
503     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
504         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
505
506         MockRaftActorContext actorContext = createActorContextWithFollower();
507         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
508             @Override
509             public FiniteDuration getHeartBeatInterval() {
510                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
511             }
512         });
513
514         long term = 1;
515         actorContext.getTermInformation().update(term, "");
516
517         leader = new Leader(actorContext);
518
519         // Leader will send an immediate heartbeat - ignore it.
520         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
521
522         // The follower would normally reply - simulate that explicitly here.
523         long lastIndex = actorContext.getReplicatedLog().lastIndex();
524         leader.handleMessage(followerActor, new AppendEntriesReply(
525                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
526         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
527
528         followerActor.underlyingActor().clear();
529
530         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
531         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
532         sendReplicate(actorContext, lastIndex + 1);
533
534         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
535         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
536
537         assertEquals(0, allMessages.get(0).getEntries().size());
538         assertEquals(1, allMessages.get(1).getEntries().size());
539     }
540
541
542     @Test
543     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
544         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
545
546         MockRaftActorContext actorContext = createActorContext();
547
548         leader = new Leader(actorContext);
549
550         actorContext.setLastApplied(0);
551
552         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
553         long term = actorContext.getTermInformation().getCurrentTerm();
554         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
555                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
556
557         actorContext.getReplicatedLog().append(newEntry);
558
559         final Identifier id = new MockIdentifier("state-id");
560         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
561                 new Replicate(leaderActor, id, newEntry, true));
562
563         // State should not change
564         assertTrue(raftBehavior instanceof Leader);
565
566         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
567
568         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
569         // one since lastApplied state is 0.
570         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
571                 leaderActor, ApplyState.class);
572         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
573
574         for (int i = 0; i <= newLogIndex - 1; i++) {
575             ApplyState applyState = applyStateList.get(i);
576             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
577             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
578         }
579
580         ApplyState last = applyStateList.get((int) newLogIndex - 1);
581         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
582         assertEquals("getIdentifier", id, last.getIdentifier());
583     }
584
585     @Test
586     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
587         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
588
589         final MockRaftActorContext actorContext = createActorContextWithFollower();
590
591         Map<String, String> leadersSnapshot = new HashMap<>();
592         leadersSnapshot.put("1", "A");
593         leadersSnapshot.put("2", "B");
594         leadersSnapshot.put("3", "C");
595
596         //clears leaders log
597         actorContext.getReplicatedLog().removeFrom(0);
598
599         final int commitIndex = 3;
600         final int snapshotIndex = 2;
601         final int snapshotTerm = 1;
602
603         // set the snapshot variables in replicatedlog
604         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
605         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
606         actorContext.setCommitIndex(commitIndex);
607         //set follower timeout to 2 mins, helps during debugging
608         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
609
610         leader = new Leader(actorContext);
611
612         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
613         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
614
615         //update follower timestamp
616         leader.markFollowerActive(FOLLOWER_ID);
617
618         ByteString bs = toByteString(leadersSnapshot);
619         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
620                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
621                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
622         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
623                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
624         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
625         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
626
627         //send first chunk and no InstallSnapshotReply received yet
628         fts.getNextChunk();
629         fts.incrementChunkIndex();
630
631         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
632                 TimeUnit.MILLISECONDS);
633
634         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
635
636         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
637
638         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
639
640         //InstallSnapshotReply received
641         fts.markSendStatus(true);
642
643         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
644
645         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
646
647         assertEquals(commitIndex, is.getLastIncludedIndex());
648     }
649
650     @Test
651     public void testSendAppendEntriesSnapshotScenario() {
652         logStart("testSendAppendEntriesSnapshotScenario");
653
654         final MockRaftActorContext actorContext = createActorContextWithFollower();
655
656         Map<String, String> leadersSnapshot = new HashMap<>();
657         leadersSnapshot.put("1", "A");
658         leadersSnapshot.put("2", "B");
659         leadersSnapshot.put("3", "C");
660
661         //clears leaders log
662         actorContext.getReplicatedLog().removeFrom(0);
663
664         final int followersLastIndex = 2;
665         final int snapshotIndex = 3;
666         final int newEntryIndex = 4;
667         final int snapshotTerm = 1;
668         final int currentTerm = 2;
669
670         // set the snapshot variables in replicatedlog
671         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
672         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
673         actorContext.setCommitIndex(followersLastIndex);
674
675         leader = new Leader(actorContext);
676
677         // Leader will send an immediate heartbeat - ignore it.
678         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
679
680         // new entry
681         SimpleReplicatedLogEntry entry =
682                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
683                         new MockRaftActorContext.MockPayload("D"));
684
685         actorContext.getReplicatedLog().append(entry);
686
687         //update follower timestamp
688         leader.markFollowerActive(FOLLOWER_ID);
689
690         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
691         RaftActorBehavior raftBehavior = leader.handleMessage(
692                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
693
694         assertTrue(raftBehavior instanceof Leader);
695
696         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
697     }
698
699     @Test
700     public void testInitiateInstallSnapshot() {
701         logStart("testInitiateInstallSnapshot");
702
703         MockRaftActorContext actorContext = createActorContextWithFollower();
704
705         //clears leaders log
706         actorContext.getReplicatedLog().removeFrom(0);
707
708         final int followersLastIndex = 2;
709         final int snapshotIndex = 3;
710         final int newEntryIndex = 4;
711         final int snapshotTerm = 1;
712         final int currentTerm = 2;
713
714         // set the snapshot variables in replicatedlog
715         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
716         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
717         actorContext.setLastApplied(3);
718         actorContext.setCommitIndex(followersLastIndex);
719
720         leader = new Leader(actorContext);
721
722         // Leader will send an immediate heartbeat - ignore it.
723         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
724
725         // set the snapshot as absent and check if capture-snapshot is invoked.
726         leader.setSnapshotHolder(null);
727
728         // new entry
729         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
730                 new MockRaftActorContext.MockPayload("D"));
731
732         actorContext.getReplicatedLog().append(entry);
733
734         //update follower timestamp
735         leader.markFollowerActive(FOLLOWER_ID);
736
737         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
738
739         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
740
741         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
742
743         assertEquals(3, cs.getLastAppliedIndex());
744         assertEquals(1, cs.getLastAppliedTerm());
745         assertEquals(4, cs.getLastIndex());
746         assertEquals(2, cs.getLastTerm());
747
748         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
749         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
750
751         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
752     }
753
754     @Test
755     public void testInitiateForceInstallSnapshot() throws Exception {
756         logStart("testInitiateForceInstallSnapshot");
757
758         MockRaftActorContext actorContext = createActorContextWithFollower();
759
760         final int followersLastIndex = 2;
761         final int snapshotIndex = -1;
762         final int newEntryIndex = 4;
763         final int snapshotTerm = -1;
764         final int currentTerm = 2;
765
766         // set the snapshot variables in replicatedlog
767         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
768         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
769         actorContext.setLastApplied(3);
770         actorContext.setCommitIndex(followersLastIndex);
771
772         actorContext.getReplicatedLog().removeFrom(0);
773
774         AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
775         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
776
777         leader = new Leader(actorContext);
778         actorContext.setCurrentBehavior(leader);
779
780         // Leader will send an immediate heartbeat - ignore it.
781         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
782
783         // set the snapshot as absent and check if capture-snapshot is invoked.
784         leader.setSnapshotHolder(null);
785
786         for (int i = 0; i < 4; i++) {
787             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
788                     new MockRaftActorContext.MockPayload("X" + i)));
789         }
790
791         // new entry
792         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
793                 new MockRaftActorContext.MockPayload("D"));
794
795         actorContext.getReplicatedLog().append(entry);
796
797         //update follower timestamp
798         leader.markFollowerActive(FOLLOWER_ID);
799
800         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
801         // installed with a SendInstallSnapshot
802         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
803                 RaftVersions.CURRENT_VERSION));
804
805         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
806
807         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
808         assertEquals(3, cs.getLastAppliedIndex());
809         assertEquals(1, cs.getLastAppliedTerm());
810         assertEquals(4, cs.getLastIndex());
811         assertEquals(2, cs.getLastTerm());
812
813         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
814         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
815
816         MessageCollectorActor.clearMessages(followerActor);
817
818         // Sending Replicate message should not initiate another capture since the first is in progress.
819         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
820         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
821
822         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
823         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
824                 RaftVersions.CURRENT_VERSION));
825         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
826
827         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
828         final byte[] bytes = new byte[]{1, 2, 3};
829         installSnapshotStream.get().get().write(bytes);
830         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
831                 Runtime.getRuntime().totalMemory());
832         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
833
834         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
835         MessageCollectorActor.clearMessages(followerActor);
836         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
837                 RaftVersions.CURRENT_VERSION));
838         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
839     }
840
841
842     @Test
843     public void testInstallSnapshot() {
844         logStart("testInstallSnapshot");
845
846         final MockRaftActorContext actorContext = createActorContextWithFollower();
847
848         Map<String, String> leadersSnapshot = new HashMap<>();
849         leadersSnapshot.put("1", "A");
850         leadersSnapshot.put("2", "B");
851         leadersSnapshot.put("3", "C");
852
853         //clears leaders log
854         actorContext.getReplicatedLog().removeFrom(0);
855
856         final int lastAppliedIndex = 3;
857         final int snapshotIndex = 2;
858         final int snapshotTerm = 1;
859         final int currentTerm = 2;
860
861         // set the snapshot variables in replicatedlog
862         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
863         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
864         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
865         actorContext.setCommitIndex(lastAppliedIndex);
866         actorContext.setLastApplied(lastAppliedIndex);
867
868         leader = new Leader(actorContext);
869
870         // Initial heartbeat.
871         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
872
873         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
874         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
875
876         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
877         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
878                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
879
880         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
881                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
882
883         assertTrue(raftBehavior instanceof Leader);
884
885         // check if installsnapshot gets called with the correct values.
886
887         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
888                 InstallSnapshot.class);
889
890         assertNotNull(installSnapshot.getData());
891         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
892         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
893
894         assertEquals(currentTerm, installSnapshot.getTerm());
895     }
896
897     @Test
898     public void testForceInstallSnapshot() {
899         logStart("testForceInstallSnapshot");
900
901         final MockRaftActorContext actorContext = createActorContextWithFollower();
902
903         Map<String, String> leadersSnapshot = new HashMap<>();
904         leadersSnapshot.put("1", "A");
905         leadersSnapshot.put("2", "B");
906         leadersSnapshot.put("3", "C");
907
908         final int lastAppliedIndex = 3;
909         final int snapshotIndex = -1;
910         final int snapshotTerm = -1;
911         final int currentTerm = 2;
912
913         // set the snapshot variables in replicatedlog
914         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
915         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
916         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
917         actorContext.setCommitIndex(lastAppliedIndex);
918         actorContext.setLastApplied(lastAppliedIndex);
919
920         leader = new Leader(actorContext);
921
922         // Initial heartbeat.
923         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
924
925         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
926         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
927
928         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
929         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
930                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
931
932         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
933                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
934
935         assertTrue(raftBehavior instanceof Leader);
936
937         // check if installsnapshot gets called with the correct values.
938
939         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
940                 InstallSnapshot.class);
941
942         assertNotNull(installSnapshot.getData());
943         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
944         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
945
946         assertEquals(currentTerm, installSnapshot.getTerm());
947     }
948
949     @Test
950     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
951         logStart("testHandleInstallSnapshotReplyLastChunk");
952
953         MockRaftActorContext actorContext = createActorContextWithFollower();
954
955         final int commitIndex = 3;
956         final int snapshotIndex = 2;
957         final int snapshotTerm = 1;
958         final int currentTerm = 2;
959
960         actorContext.setCommitIndex(commitIndex);
961
962         leader = new Leader(actorContext);
963         actorContext.setCurrentBehavior(leader);
964
965         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
966         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
967
968         // Ignore initial heartbeat.
969         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
970
971         Map<String, String> leadersSnapshot = new HashMap<>();
972         leadersSnapshot.put("1", "A");
973         leadersSnapshot.put("2", "B");
974         leadersSnapshot.put("3", "C");
975
976         // set the snapshot variables in replicatedlog
977
978         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
979         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
980         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
981
982         ByteString bs = toByteString(leadersSnapshot);
983         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
984                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
985                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
986         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
987                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
988         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
989         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
990         while (!fts.isLastChunk(fts.getChunkIndex())) {
991             fts.getNextChunk();
992             fts.incrementChunkIndex();
993         }
994
995         //clears leaders log
996         actorContext.getReplicatedLog().removeFrom(0);
997
998         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
999                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
1000
1001         assertTrue(raftBehavior instanceof Leader);
1002
1003         assertEquals(1, leader.followerLogSize());
1004         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
1005         assertNotNull(fli);
1006         assertNull(fli.getInstallSnapshotState());
1007         assertEquals(commitIndex, fli.getMatchIndex());
1008         assertEquals(commitIndex + 1, fli.getNextIndex());
1009         assertFalse(leader.hasSnapshot());
1010     }
1011
1012     @Test
1013     public void testSendSnapshotfromInstallSnapshotReply() {
1014         logStart("testSendSnapshotfromInstallSnapshotReply");
1015
1016         MockRaftActorContext actorContext = createActorContextWithFollower();
1017
1018         final int commitIndex = 3;
1019         final int snapshotIndex = 2;
1020         final int snapshotTerm = 1;
1021         final int currentTerm = 2;
1022
1023         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1024             @Override
1025             public int getSnapshotChunkSize() {
1026                 return 50;
1027             }
1028         };
1029         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1030         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1031
1032         actorContext.setConfigParams(configParams);
1033         actorContext.setCommitIndex(commitIndex);
1034
1035         leader = new Leader(actorContext);
1036         actorContext.setCurrentBehavior(leader);
1037
1038         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1039         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1040
1041         Map<String, String> leadersSnapshot = new HashMap<>();
1042         leadersSnapshot.put("1", "A");
1043         leadersSnapshot.put("2", "B");
1044         leadersSnapshot.put("3", "C");
1045
1046         // set the snapshot variables in replicatedlog
1047         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1048         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1049         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1050
1051         ByteString bs = toByteString(leadersSnapshot);
1052         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1053                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1054                 -1, null, null);
1055
1056         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1057
1058         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1059                 InstallSnapshot.class);
1060
1061         assertEquals(1, installSnapshot.getChunkIndex());
1062         assertEquals(3, installSnapshot.getTotalChunks());
1063
1064         followerActor.underlyingActor().clear();
1065         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1066                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1067
1068         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1069
1070         assertEquals(2, installSnapshot.getChunkIndex());
1071         assertEquals(3, installSnapshot.getTotalChunks());
1072
1073         followerActor.underlyingActor().clear();
1074         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1075                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1076
1077         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1078
1079         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1080         followerActor.underlyingActor().clear();
1081         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1082                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1083
1084         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1085
1086         assertNull(installSnapshot);
1087     }
1088
1089
1090     @Test
1091     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1092         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1093
1094         MockRaftActorContext actorContext = createActorContextWithFollower();
1095
1096         final int commitIndex = 3;
1097         final int snapshotIndex = 2;
1098         final int snapshotTerm = 1;
1099         final int currentTerm = 2;
1100
1101         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1102             @Override
1103             public int getSnapshotChunkSize() {
1104                 return 50;
1105             }
1106         });
1107
1108         actorContext.setCommitIndex(commitIndex);
1109
1110         leader = new Leader(actorContext);
1111
1112         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1113         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1114
1115         Map<String, String> leadersSnapshot = new HashMap<>();
1116         leadersSnapshot.put("1", "A");
1117         leadersSnapshot.put("2", "B");
1118         leadersSnapshot.put("3", "C");
1119
1120         // set the snapshot variables in replicatedlog
1121         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1122         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1123         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1124
1125         ByteString bs = toByteString(leadersSnapshot);
1126         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1127                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1128                 -1, null, null);
1129
1130         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1131         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1132
1133         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1134                 InstallSnapshot.class);
1135
1136         assertEquals(1, installSnapshot.getChunkIndex());
1137         assertEquals(3, installSnapshot.getTotalChunks());
1138
1139         followerActor.underlyingActor().clear();
1140
1141         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1142                 FOLLOWER_ID, -1, false));
1143
1144         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1145                 TimeUnit.MILLISECONDS);
1146
1147         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1148
1149         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1150
1151         assertEquals(1, installSnapshot.getChunkIndex());
1152         assertEquals(3, installSnapshot.getTotalChunks());
1153     }
1154
1155     @Test
1156     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1157         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1158
1159         MockRaftActorContext actorContext = createActorContextWithFollower();
1160
1161         final int commitIndex = 3;
1162         final int snapshotIndex = 2;
1163         final int snapshotTerm = 1;
1164         final int currentTerm = 2;
1165
1166         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1167             @Override
1168             public int getSnapshotChunkSize() {
1169                 return 50;
1170             }
1171         });
1172
1173         actorContext.setCommitIndex(commitIndex);
1174
1175         leader = new Leader(actorContext);
1176
1177         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1178         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1179
1180         Map<String, String> leadersSnapshot = new HashMap<>();
1181         leadersSnapshot.put("1", "A");
1182         leadersSnapshot.put("2", "B");
1183         leadersSnapshot.put("3", "C");
1184
1185         // set the snapshot variables in replicatedlog
1186         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1187         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1188         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1189
1190         ByteString bs = toByteString(leadersSnapshot);
1191         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1192                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1193                 -1, null, null);
1194
1195         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1196
1197         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1198                 InstallSnapshot.class);
1199
1200         assertEquals(1, installSnapshot.getChunkIndex());
1201         assertEquals(3, installSnapshot.getTotalChunks());
1202         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1203                 installSnapshot.getLastChunkHashCode().get().intValue());
1204
1205         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1206
1207         followerActor.underlyingActor().clear();
1208
1209         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1210                 FOLLOWER_ID, 1, true));
1211
1212         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1213
1214         assertEquals(2, installSnapshot.getChunkIndex());
1215         assertEquals(3, installSnapshot.getTotalChunks());
1216         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1217     }
1218
1219     @Test
1220     public void testLeaderInstallSnapshotState() throws IOException {
1221         logStart("testLeaderInstallSnapshotState");
1222
1223         Map<String, String> leadersSnapshot = new HashMap<>();
1224         leadersSnapshot.put("1", "A");
1225         leadersSnapshot.put("2", "B");
1226         leadersSnapshot.put("3", "C");
1227
1228         ByteString bs = toByteString(leadersSnapshot);
1229         byte[] barray = bs.toByteArray();
1230
1231         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1232         fts.setSnapshotBytes(ByteSource.wrap(barray));
1233
1234         assertEquals(bs.size(), barray.length);
1235
1236         int chunkIndex = 0;
1237         for (int i = 0; i < barray.length; i = i + 50) {
1238             int length = i + 50;
1239             chunkIndex++;
1240
1241             if (i + 50 > barray.length) {
1242                 length = barray.length;
1243             }
1244
1245             byte[] chunk = fts.getNextChunk();
1246             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1247             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1248
1249             fts.markSendStatus(true);
1250             if (!fts.isLastChunk(chunkIndex)) {
1251                 fts.incrementChunkIndex();
1252             }
1253         }
1254
1255         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1256         fts.close();
1257     }
1258
1259     @Override
1260     protected Leader createBehavior(final RaftActorContext actorContext) {
1261         return new Leader(actorContext);
1262     }
1263
1264     @Override
1265     protected MockRaftActorContext createActorContext() {
1266         return createActorContext(leaderActor);
1267     }
1268
1269     @Override
1270     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1271         return createActorContext(LEADER_ID, actorRef);
1272     }
1273
1274     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1275         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1276         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1277         configParams.setElectionTimeoutFactor(100000);
1278         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1279         context.setConfigParams(configParams);
1280         context.setPayloadVersion(payloadVersion);
1281         return context;
1282     }
1283
1284     private MockRaftActorContext createActorContextWithFollower() {
1285         MockRaftActorContext actorContext = createActorContext();
1286         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1287                 followerActor.path().toString()).build());
1288         return actorContext;
1289     }
1290
1291     private MockRaftActorContext createFollowerActorContextWithLeader() {
1292         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1293         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1294         followerConfig.setElectionTimeoutFactor(10000);
1295         followerActorContext.setConfigParams(followerConfig);
1296         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1297         return followerActorContext;
1298     }
1299
1300     @Test
1301     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1302         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1303
1304         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1305
1306         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1307
1308         Follower follower = new Follower(followerActorContext);
1309         followerActor.underlyingActor().setBehavior(follower);
1310         followerActorContext.setCurrentBehavior(follower);
1311
1312         Map<String, String> peerAddresses = new HashMap<>();
1313         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1314
1315         leaderActorContext.setPeerAddresses(peerAddresses);
1316
1317         leaderActorContext.getReplicatedLog().removeFrom(0);
1318
1319         //create 3 entries
1320         leaderActorContext.setReplicatedLog(
1321                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1322
1323         leaderActorContext.setCommitIndex(1);
1324
1325         followerActorContext.getReplicatedLog().removeFrom(0);
1326
1327         // follower too has the exact same log entries and has the same commit index
1328         followerActorContext.setReplicatedLog(
1329                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1330
1331         followerActorContext.setCommitIndex(1);
1332
1333         leader = new Leader(leaderActorContext);
1334         leaderActorContext.setCurrentBehavior(leader);
1335
1336         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1337
1338         assertEquals(-1, appendEntries.getLeaderCommit());
1339         assertEquals(0, appendEntries.getEntries().size());
1340         assertEquals(0, appendEntries.getPrevLogIndex());
1341
1342         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1343                 leaderActor, AppendEntriesReply.class);
1344
1345         assertEquals(2, appendEntriesReply.getLogLastIndex());
1346         assertEquals(1, appendEntriesReply.getLogLastTerm());
1347
1348         // follower returns its next index
1349         assertEquals(2, appendEntriesReply.getLogLastIndex());
1350         assertEquals(1, appendEntriesReply.getLogLastTerm());
1351
1352         follower.close();
1353     }
1354
1355     @Test
1356     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1357         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1358
1359         final MockRaftActorContext leaderActorContext = createActorContext();
1360
1361         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1362         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1363
1364         Follower follower = new Follower(followerActorContext);
1365         followerActor.underlyingActor().setBehavior(follower);
1366         followerActorContext.setCurrentBehavior(follower);
1367
1368         Map<String, String> leaderPeerAddresses = new HashMap<>();
1369         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1370
1371         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1372
1373         leaderActorContext.getReplicatedLog().removeFrom(0);
1374
1375         leaderActorContext.setReplicatedLog(
1376                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1377
1378         leaderActorContext.setCommitIndex(1);
1379
1380         followerActorContext.getReplicatedLog().removeFrom(0);
1381
1382         followerActorContext.setReplicatedLog(
1383                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1384
1385         // follower has the same log entries but its commit index > leaders commit index
1386         followerActorContext.setCommitIndex(2);
1387
1388         leader = new Leader(leaderActorContext);
1389
1390         // Initial heartbeat
1391         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1392
1393         assertEquals(-1, appendEntries.getLeaderCommit());
1394         assertEquals(0, appendEntries.getEntries().size());
1395         assertEquals(0, appendEntries.getPrevLogIndex());
1396
1397         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1398                 leaderActor, AppendEntriesReply.class);
1399
1400         assertEquals(2, appendEntriesReply.getLogLastIndex());
1401         assertEquals(1, appendEntriesReply.getLogLastTerm());
1402
1403         leaderActor.underlyingActor().setBehavior(follower);
1404         leader.handleMessage(followerActor, appendEntriesReply);
1405
1406         leaderActor.underlyingActor().clear();
1407         followerActor.underlyingActor().clear();
1408
1409         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1410                 TimeUnit.MILLISECONDS);
1411
1412         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1413
1414         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1415
1416         assertEquals(2, appendEntries.getLeaderCommit());
1417         assertEquals(0, appendEntries.getEntries().size());
1418         assertEquals(2, appendEntries.getPrevLogIndex());
1419
1420         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1421
1422         assertEquals(2, appendEntriesReply.getLogLastIndex());
1423         assertEquals(1, appendEntriesReply.getLogLastTerm());
1424
1425         assertEquals(2, followerActorContext.getCommitIndex());
1426
1427         follower.close();
1428     }
1429
1430     @Test
1431     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1432         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1433
1434         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1435         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1436                 new FiniteDuration(1000, TimeUnit.SECONDS));
1437
1438         leaderActorContext.setReplicatedLog(
1439                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1440         long leaderCommitIndex = 2;
1441         leaderActorContext.setCommitIndex(leaderCommitIndex);
1442         leaderActorContext.setLastApplied(leaderCommitIndex);
1443
1444         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1445         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1446
1447         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1448
1449         followerActorContext.setReplicatedLog(
1450                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1451         followerActorContext.setCommitIndex(0);
1452         followerActorContext.setLastApplied(0);
1453
1454         Follower follower = new Follower(followerActorContext);
1455         followerActor.underlyingActor().setBehavior(follower);
1456
1457         leader = new Leader(leaderActorContext);
1458
1459         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1460         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1461                 AppendEntriesReply.class);
1462
1463         MessageCollectorActor.clearMessages(followerActor);
1464         MessageCollectorActor.clearMessages(leaderActor);
1465
1466         // Verify initial AppendEntries sent.
1467         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1468         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1469         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1470
1471         leaderActor.underlyingActor().setBehavior(leader);
1472
1473         leader.handleMessage(followerActor, appendEntriesReply);
1474
1475         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1476         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1477
1478         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1479         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1480         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1481
1482         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1483         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1484                 appendEntries.getEntries().get(0).getData());
1485         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1486         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1487                 appendEntries.getEntries().get(1).getData());
1488
1489         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1490         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1491
1492         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1493
1494         ApplyState applyState = applyStateList.get(0);
1495         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1496         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1497         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1498                 applyState.getReplicatedLogEntry().getData());
1499
1500         applyState = applyStateList.get(1);
1501         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1502         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1503         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1504                 applyState.getReplicatedLogEntry().getData());
1505
1506         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1507         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1508     }
1509
1510     @Test
1511     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1512         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1513
1514         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1515         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1516                 new FiniteDuration(1000, TimeUnit.SECONDS));
1517
1518         leaderActorContext.setReplicatedLog(
1519                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1520         long leaderCommitIndex = 1;
1521         leaderActorContext.setCommitIndex(leaderCommitIndex);
1522         leaderActorContext.setLastApplied(leaderCommitIndex);
1523
1524         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1525         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1526
1527         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1528
1529         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1530         followerActorContext.setCommitIndex(-1);
1531         followerActorContext.setLastApplied(-1);
1532
1533         Follower follower = new Follower(followerActorContext);
1534         followerActor.underlyingActor().setBehavior(follower);
1535         followerActorContext.setCurrentBehavior(follower);
1536
1537         leader = new Leader(leaderActorContext);
1538
1539         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1540         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1541                 AppendEntriesReply.class);
1542
1543         MessageCollectorActor.clearMessages(followerActor);
1544         MessageCollectorActor.clearMessages(leaderActor);
1545
1546         // Verify initial AppendEntries sent with the leader's current commit index.
1547         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1548         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1549         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1550
1551         leaderActor.underlyingActor().setBehavior(leader);
1552         leaderActorContext.setCurrentBehavior(leader);
1553
1554         leader.handleMessage(followerActor, appendEntriesReply);
1555
1556         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1557         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1558
1559         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1560         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1561         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1562
1563         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1564         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1565                 appendEntries.getEntries().get(0).getData());
1566         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1567         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1568                 appendEntries.getEntries().get(1).getData());
1569
1570         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1571         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1572
1573         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1574
1575         ApplyState applyState = applyStateList.get(0);
1576         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1577         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1578         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1579                 applyState.getReplicatedLogEntry().getData());
1580
1581         applyState = applyStateList.get(1);
1582         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1583         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1584         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1585                 applyState.getReplicatedLogEntry().getData());
1586
1587         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1588         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1589     }
1590
1591     @Test
1592     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1593         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1594
1595         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1596         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1597                 new FiniteDuration(1000, TimeUnit.SECONDS));
1598
1599         leaderActorContext.setReplicatedLog(
1600                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1601         long leaderCommitIndex = 1;
1602         leaderActorContext.setCommitIndex(leaderCommitIndex);
1603         leaderActorContext.setLastApplied(leaderCommitIndex);
1604
1605         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1606         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1607
1608         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1609
1610         followerActorContext.setReplicatedLog(
1611                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1612         followerActorContext.setCommitIndex(-1);
1613         followerActorContext.setLastApplied(-1);
1614
1615         Follower follower = new Follower(followerActorContext);
1616         followerActor.underlyingActor().setBehavior(follower);
1617         followerActorContext.setCurrentBehavior(follower);
1618
1619         leader = new Leader(leaderActorContext);
1620
1621         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1622         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1623                 AppendEntriesReply.class);
1624
1625         MessageCollectorActor.clearMessages(followerActor);
1626         MessageCollectorActor.clearMessages(leaderActor);
1627
1628         // Verify initial AppendEntries sent with the leader's current commit index.
1629         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1630         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1631         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1632
1633         leaderActor.underlyingActor().setBehavior(leader);
1634         leaderActorContext.setCurrentBehavior(leader);
1635
1636         leader.handleMessage(followerActor, appendEntriesReply);
1637
1638         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1639         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1640
1641         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1642         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1643         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1644
1645         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1646         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1647         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1648                 appendEntries.getEntries().get(0).getData());
1649         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1650         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1651         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1652                 appendEntries.getEntries().get(1).getData());
1653
1654         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1655         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1656
1657         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1658
1659         ApplyState applyState = applyStateList.get(0);
1660         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1661         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1662         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1663                 applyState.getReplicatedLogEntry().getData());
1664
1665         applyState = applyStateList.get(1);
1666         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1667         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1668         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1669                 applyState.getReplicatedLogEntry().getData());
1670
1671         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1672         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1673         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1674     }
1675
1676     @Test
1677     public void testHandleAppendEntriesReplyWithNewerTerm() {
1678         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1679
1680         MockRaftActorContext leaderActorContext = createActorContext();
1681         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1682                 new FiniteDuration(10000, TimeUnit.SECONDS));
1683
1684         leaderActorContext.setReplicatedLog(
1685                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1686
1687         leader = new Leader(leaderActorContext);
1688         leaderActor.underlyingActor().setBehavior(leader);
1689         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1690
1691         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1692                 AppendEntriesReply.class);
1693
1694         assertEquals(false, appendEntriesReply.isSuccess());
1695         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1696
1697         MessageCollectorActor.clearMessages(leaderActor);
1698     }
1699
1700     @Test
1701     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1702         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1703
1704         MockRaftActorContext leaderActorContext = createActorContext();
1705         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1706                 new FiniteDuration(10000, TimeUnit.SECONDS));
1707
1708         leaderActorContext.setReplicatedLog(
1709                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1710         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1711
1712         leader = new Leader(leaderActorContext);
1713         leaderActor.underlyingActor().setBehavior(leader);
1714         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1715
1716         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1717                 AppendEntriesReply.class);
1718
1719         assertEquals(false, appendEntriesReply.isSuccess());
1720         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1721
1722         MessageCollectorActor.clearMessages(leaderActor);
1723     }
1724
1725     @Test
1726     public void testHandleAppendEntriesReplySuccess() {
1727         logStart("testHandleAppendEntriesReplySuccess");
1728
1729         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1730
1731         leaderActorContext.setReplicatedLog(
1732                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1733
1734         leaderActorContext.setCommitIndex(1);
1735         leaderActorContext.setLastApplied(1);
1736         leaderActorContext.getTermInformation().update(1, "leader");
1737
1738         leader = new Leader(leaderActorContext);
1739
1740         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1741
1742         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1743         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1744
1745         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1746
1747         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1748
1749         assertEquals(RaftState.Leader, raftActorBehavior.state());
1750
1751         assertEquals(2, leaderActorContext.getCommitIndex());
1752
1753         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1754                 leaderActor, ApplyJournalEntries.class);
1755
1756         assertEquals(2, leaderActorContext.getLastApplied());
1757
1758         assertEquals(2, applyJournalEntries.getToIndex());
1759
1760         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1761                 ApplyState.class);
1762
1763         assertEquals(1,applyStateList.size());
1764
1765         ApplyState applyState = applyStateList.get(0);
1766
1767         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1768
1769         assertEquals(2, followerInfo.getMatchIndex());
1770         assertEquals(3, followerInfo.getNextIndex());
1771         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1772         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1773     }
1774
1775     @Test
1776     public void testHandleAppendEntriesReplyUnknownFollower() {
1777         logStart("testHandleAppendEntriesReplyUnknownFollower");
1778
1779         MockRaftActorContext leaderActorContext = createActorContext();
1780
1781         leader = new Leader(leaderActorContext);
1782
1783         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1784
1785         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1786
1787         assertEquals(RaftState.Leader, raftActorBehavior.state());
1788     }
1789
1790     @Test
1791     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1792         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1793
1794         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1795         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1796                 new FiniteDuration(1000, TimeUnit.SECONDS));
1797         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1798
1799         leaderActorContext.setReplicatedLog(
1800                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1801         long leaderCommitIndex = 3;
1802         leaderActorContext.setCommitIndex(leaderCommitIndex);
1803         leaderActorContext.setLastApplied(leaderCommitIndex);
1804
1805         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1806         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1807         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1808         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1809
1810         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1811
1812         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1813         followerActorContext.setCommitIndex(-1);
1814         followerActorContext.setLastApplied(-1);
1815
1816         Follower follower = new Follower(followerActorContext);
1817         followerActor.underlyingActor().setBehavior(follower);
1818         followerActorContext.setCurrentBehavior(follower);
1819
1820         leader = new Leader(leaderActorContext);
1821
1822         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1823         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1824                 AppendEntriesReply.class);
1825
1826         MessageCollectorActor.clearMessages(followerActor);
1827         MessageCollectorActor.clearMessages(leaderActor);
1828
1829         // Verify initial AppendEntries sent with the leader's current commit index.
1830         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1831         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1832         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1833
1834         leaderActor.underlyingActor().setBehavior(leader);
1835         leaderActorContext.setCurrentBehavior(leader);
1836
1837         leader.handleMessage(followerActor, appendEntriesReply);
1838
1839         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1840                 AppendEntries.class, 2);
1841         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1842
1843         appendEntries = appendEntriesList.get(0);
1844         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1845         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1846         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1847
1848         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1849         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1850                 appendEntries.getEntries().get(0).getData());
1851         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1852         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1853                 appendEntries.getEntries().get(1).getData());
1854
1855         appendEntries = appendEntriesList.get(1);
1856         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1857         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1858         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1859
1860         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1861         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1862                 appendEntries.getEntries().get(0).getData());
1863         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1864         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1865                 appendEntries.getEntries().get(1).getData());
1866
1867         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1868         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1869
1870         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1871
1872         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1873         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1874     }
1875
1876     @Test
1877     public void testHandleRequestVoteReply() {
1878         logStart("testHandleRequestVoteReply");
1879
1880         MockRaftActorContext leaderActorContext = createActorContext();
1881
1882         leader = new Leader(leaderActorContext);
1883
1884         // Should be a no-op.
1885         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1886                 new RequestVoteReply(1, true));
1887
1888         assertEquals(RaftState.Leader, raftActorBehavior.state());
1889
1890         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1891
1892         assertEquals(RaftState.Leader, raftActorBehavior.state());
1893     }
1894
1895     @Test
1896     public void testIsolatedLeaderCheckNoFollowers() {
1897         logStart("testIsolatedLeaderCheckNoFollowers");
1898
1899         MockRaftActorContext leaderActorContext = createActorContext();
1900
1901         leader = new Leader(leaderActorContext);
1902         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1903         assertTrue(newBehavior instanceof Leader);
1904     }
1905
1906     @Test
1907     public void testIsolatedLeaderCheckNoVotingFollowers() {
1908         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1909
1910         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1911         Follower follower = new Follower(followerActorContext);
1912         followerActor.underlyingActor().setBehavior(follower);
1913
1914         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1915         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1916                 new FiniteDuration(1000, TimeUnit.SECONDS));
1917         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1918
1919         leader = new Leader(leaderActorContext);
1920         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1921         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1922         assertTrue("Expected Leader", newBehavior instanceof Leader);
1923     }
1924
1925     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1926         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1927         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1928
1929         MockRaftActorContext leaderActorContext = createActorContext();
1930
1931         Map<String, String> peerAddresses = new HashMap<>();
1932         peerAddresses.put("follower-1", followerActor1.path().toString());
1933         peerAddresses.put("follower-2", followerActor2.path().toString());
1934
1935         leaderActorContext.setPeerAddresses(peerAddresses);
1936         leaderActorContext.setRaftPolicy(raftPolicy);
1937
1938         leader = new Leader(leaderActorContext);
1939
1940         leader.markFollowerActive("follower-1");
1941         leader.markFollowerActive("follower-2");
1942         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1944
1945         // kill 1 follower and verify if that got killed
1946         final TestKit probe = new TestKit(getSystem());
1947         probe.watch(followerActor1);
1948         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1949         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1950         assertEquals(termMsg1.getActor(), followerActor1);
1951
1952         leader.markFollowerInActive("follower-1");
1953         leader.markFollowerActive("follower-2");
1954         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1955         assertTrue("Behavior not instance of Leader when majority of followers are active",
1956                 newBehavior instanceof Leader);
1957
1958         // kill 2nd follower and leader should change to Isolated leader
1959         followerActor2.tell(PoisonPill.getInstance(), null);
1960         probe.watch(followerActor2);
1961         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1962         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1963         assertEquals(termMsg2.getActor(), followerActor2);
1964
1965         leader.markFollowerInActive("follower-2");
1966         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1967     }
1968
1969     @Test
1970     public void testIsolatedLeaderCheckTwoFollowers() {
1971         logStart("testIsolatedLeaderCheckTwoFollowers");
1972
1973         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1974
1975         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1976             newBehavior instanceof IsolatedLeader);
1977     }
1978
1979     @Test
1980     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1981         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1982
1983         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1984
1985         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1986                 newBehavior instanceof Leader);
1987     }
1988
1989     @Test
1990     public void testLaggingFollowerStarvation() {
1991         logStart("testLaggingFollowerStarvation");
1992
1993         String leaderActorId = actorFactory.generateActorId("leader");
1994         String follower1ActorId = actorFactory.generateActorId("follower");
1995         String follower2ActorId = actorFactory.generateActorId("follower");
1996
1997         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1998         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1999
2000         MockRaftActorContext leaderActorContext =
2001                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
2002
2003         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
2004         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
2005         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
2006
2007         leaderActorContext.setConfigParams(configParams);
2008
2009         leaderActorContext.setReplicatedLog(
2010                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2011
2012         Map<String, String> peerAddresses = new HashMap<>();
2013         peerAddresses.put(follower1ActorId,
2014                 follower1Actor.path().toString());
2015         peerAddresses.put(follower2ActorId,
2016                 follower2Actor.path().toString());
2017
2018         leaderActorContext.setPeerAddresses(peerAddresses);
2019         leaderActorContext.getTermInformation().update(1, leaderActorId);
2020
2021         leader = createBehavior(leaderActorContext);
2022
2023         leaderActor.underlyingActor().setBehavior(leader);
2024
2025         for (int i = 1; i < 6; i++) {
2026             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2027             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2028                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2029             assertTrue(newBehavior == leader);
2030             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2031         }
2032
2033         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2034         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2035
2036         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2037                 heartbeats.size() > 1);
2038
2039         // Check if follower-2 got AppendEntries during this time and was not starved
2040         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2041
2042         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2043                 appendEntries.size() > 1);
2044     }
2045
2046     @Test
2047     public void testReplicationConsensusWithNonVotingFollower() {
2048         logStart("testReplicationConsensusWithNonVotingFollower");
2049
2050         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2052                 new FiniteDuration(1000, TimeUnit.SECONDS));
2053
2054         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2055         leaderActorContext.setCommitIndex(-1);
2056         leaderActorContext.setLastApplied(-1);
2057
2058         String nonVotingFollowerId = "nonvoting-follower";
2059         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2060                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2061
2062         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2063                 VotingState.NON_VOTING);
2064
2065         leader = new Leader(leaderActorContext);
2066         leaderActorContext.setCurrentBehavior(leader);
2067
2068         // Ignore initial heartbeats
2069         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2071
2072         MessageCollectorActor.clearMessages(followerActor);
2073         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2074         MessageCollectorActor.clearMessages(leaderActor);
2075
2076         // Send a Replicate message and wait for AppendEntries.
2077         sendReplicate(leaderActorContext, 0);
2078
2079         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2080         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2081
2082         // Send reply only from the voting follower and verify consensus via ApplyState.
2083         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2084
2085         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2086
2087         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2088
2089         MessageCollectorActor.clearMessages(followerActor);
2090         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2091         MessageCollectorActor.clearMessages(leaderActor);
2092
2093         // Send another Replicate message
2094         sendReplicate(leaderActorContext, 1);
2095
2096         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2097         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2098                 AppendEntries.class);
2099         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2100         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2101
2102         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2103         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2104
2105         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2106
2107         // Send reply from the voting follower and verify consensus.
2108         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2109
2110         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2111     }
2112
2113     @Test
2114     public void testTransferLeadershipWithFollowerInSync() {
2115         logStart("testTransferLeadershipWithFollowerInSync");
2116
2117         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2118         leaderActorContext.setLastApplied(-1);
2119         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2120                 new FiniteDuration(1000, TimeUnit.SECONDS));
2121         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2122
2123         leader = new Leader(leaderActorContext);
2124         leaderActorContext.setCurrentBehavior(leader);
2125
2126         // Initial heartbeat
2127         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2128         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2129         MessageCollectorActor.clearMessages(followerActor);
2130
2131         sendReplicate(leaderActorContext, 0);
2132         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133
2134         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2135         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2136         MessageCollectorActor.clearMessages(followerActor);
2137
2138         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2139         leader.transferLeadership(mockTransferCohort);
2140
2141         verify(mockTransferCohort, never()).transferComplete();
2142         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2145
2146         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2147         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2148
2149         // Leader should force an election timeout
2150         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2151
2152         verify(mockTransferCohort).transferComplete();
2153     }
2154
2155     @Test
2156     public void testTransferLeadershipWithEmptyLog() {
2157         logStart("testTransferLeadershipWithEmptyLog");
2158
2159         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2160         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2161                 new FiniteDuration(1000, TimeUnit.SECONDS));
2162         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2163
2164         leader = new Leader(leaderActorContext);
2165         leaderActorContext.setCurrentBehavior(leader);
2166
2167         // Initial heartbeat
2168         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2169         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2170         MessageCollectorActor.clearMessages(followerActor);
2171
2172         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2173         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2174         leader.transferLeadership(mockTransferCohort);
2175
2176         verify(mockTransferCohort, never()).transferComplete();
2177         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2179
2180         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2181         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2182
2183         // Leader should force an election timeout
2184         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2185
2186         verify(mockTransferCohort).transferComplete();
2187     }
2188
2189     @Test
2190     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2191         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2192
2193         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2194         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2195                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2196
2197         leader = new Leader(leaderActorContext);
2198         leaderActorContext.setCurrentBehavior(leader);
2199
2200         // Initial heartbeat
2201         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2202         MessageCollectorActor.clearMessages(followerActor);
2203
2204         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2205         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2206         leader.transferLeadership(mockTransferCohort);
2207
2208         verify(mockTransferCohort, never()).transferComplete();
2209
2210         // Sync up the follower.
2211         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2212         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2213         MessageCollectorActor.clearMessages(followerActor);
2214
2215         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2216                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2217         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2218         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2219         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2220
2221         // Leader should force an election timeout
2222         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2223
2224         verify(mockTransferCohort).transferComplete();
2225     }
2226
2227     @Test
2228     public void testTransferLeadershipWithFollowerSyncTimeout() {
2229         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2230
2231         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2232         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2233                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2234         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2235         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2236
2237         leader = new Leader(leaderActorContext);
2238         leaderActorContext.setCurrentBehavior(leader);
2239
2240         // Initial heartbeat
2241         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2242         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2243         MessageCollectorActor.clearMessages(followerActor);
2244
2245         sendReplicate(leaderActorContext, 0);
2246         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2247
2248         MessageCollectorActor.clearMessages(followerActor);
2249
2250         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2251         leader.transferLeadership(mockTransferCohort);
2252
2253         verify(mockTransferCohort, never()).transferComplete();
2254
2255         // Send heartbeats to time out the transfer.
2256         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2257             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2258                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2259             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2260         }
2261
2262         verify(mockTransferCohort).abortTransfer();
2263         verify(mockTransferCohort, never()).transferComplete();
2264         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2265     }
2266
2267     @Test
2268     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2269         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2270
2271         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2272                 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2273                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2274         final MockRaftActorContext.MockPayload largePayload =
2275                 new MockRaftActorContext.MockPayload("large", serializedSize);
2276
2277         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2278         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2279                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2280         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2281         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2282         leaderActorContext.setCommitIndex(-1);
2283         leaderActorContext.setLastApplied(-1);
2284
2285         leader = new Leader(leaderActorContext);
2286         leaderActorContext.setCurrentBehavior(leader);
2287
2288         // Send initial heartbeat reply so follower is marked active
2289         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2290         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2291         MessageCollectorActor.clearMessages(followerActor);
2292
2293         // Send normal payload first to prime commit index.
2294         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2295         sendReplicate(leaderActorContext, term, 0);
2296
2297         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2298         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2299         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2300
2301         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2302         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2303         MessageCollectorActor.clearMessages(followerActor);
2304
2305         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2306         sendReplicate(leaderActorContext, term, 1, largePayload);
2307
2308         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2309         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2310         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2311
2312         final Identifier slicingId = messageSlice.getIdentifier();
2313
2314         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2315         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2316         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2317         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2318         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2319         MessageCollectorActor.clearMessages(followerActor);
2320
2321         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2322
2323         // Sleep for the heartbeat interval so AppendEntries is sent.
2324         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2325                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2326
2327         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2328
2329         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2330         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2331         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2332         MessageCollectorActor.clearMessages(followerActor);
2333
2334         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2335
2336         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2337         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2338         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2339
2340         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2341
2342         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2343
2344         MessageCollectorActor.clearMessages(followerActor);
2345
2346         // Send another normal payload.
2347
2348         sendReplicate(leaderActorContext, term, 2);
2349
2350         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2351         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2352         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2353         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2354     }
2355
2356     @Test
2357     public void testLargePayloadSlicingExpiration() {
2358         logStart("testLargePayloadSlicingExpiration");
2359
2360         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2361         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2362                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2363         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2364         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2365         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2366         leaderActorContext.setCommitIndex(-1);
2367         leaderActorContext.setLastApplied(-1);
2368
2369         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2370         leader = new Leader(leaderActorContext);
2371         leaderActorContext.setCurrentBehavior(leader);
2372
2373         // Send initial heartbeat reply so follower is marked active
2374         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2375         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2376         MessageCollectorActor.clearMessages(followerActor);
2377
2378         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2379                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2380         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2381
2382         // Sleep for at least 3 * election timeout so the slicing state expires.
2383         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2384                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2385         MessageCollectorActor.clearMessages(followerActor);
2386
2387         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2388
2389         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2390         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2391         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2392
2393         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2394         MessageCollectorActor.clearMessages(followerActor);
2395
2396         // Send an AppendEntriesReply - this should restart the slicing.
2397
2398         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2399                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2400
2401         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2402
2403         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2404     }
2405
2406     @Test
2407     public void testLeaderAddressInAppendEntries() {
2408         logStart("testLeaderAddressInAppendEntries");
2409
2410         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2411         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2412                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2413         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2414         leaderActorContext.setCommitIndex(-1);
2415         leaderActorContext.setLastApplied(-1);
2416
2417         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2418             peerId -> leaderActor.path().toString());
2419
2420         leader = new Leader(leaderActorContext);
2421         leaderActorContext.setCurrentBehavior(leader);
2422
2423         // Initial heartbeat shouldn't have the leader address
2424
2425         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2426         assertFalse(appendEntries.getLeaderAddress().isPresent());
2427         MessageCollectorActor.clearMessages(followerActor);
2428
2429         // Send AppendEntriesReply indicating the follower needs the leader address
2430
2431         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2432                 RaftVersions.CURRENT_VERSION));
2433
2434         // Sleep for the heartbeat interval so AppendEntries is sent.
2435         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2436                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2437
2438         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2439
2440         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2441         assertTrue(appendEntries.getLeaderAddress().isPresent());
2442         assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2443         MessageCollectorActor.clearMessages(followerActor);
2444
2445         // Send AppendEntriesReply indicating the follower does not need the leader address
2446
2447         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2448                 RaftVersions.CURRENT_VERSION));
2449
2450         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2451                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2452
2453         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2454
2455         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2456         assertFalse(appendEntries.getLeaderAddress().isPresent());
2457     }
2458
2459     @Override
2460     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2461             final ActorRef actorRef, final RaftRPC rpc) {
2462         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2463         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2464     }
2465
2466     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2467
2468         private final long electionTimeOutIntervalMillis;
2469         private final int snapshotChunkSize;
2470
2471         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2472             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2473             this.snapshotChunkSize = snapshotChunkSize;
2474         }
2475
2476         @Override
2477         public FiniteDuration getElectionTimeOutInterval() {
2478             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2479         }
2480
2481         @Override
2482         public int getSnapshotChunkSize() {
2483             return snapshotChunkSize;
2484         }
2485     }
2486 }