Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.io.OutputStream;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Optional;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
69 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
70 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
75 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
83
84     static final String FOLLOWER_ID = "follower";
85     public static final String LEADER_ID = "leader";
86
87     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
89
90     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
92
93     private Leader leader;
94     private final short payloadVersion = 5;
95
96     @Override
97     @After
98     public void tearDown() {
99         if (leader != null) {
100             leader.close();
101         }
102
103         super.tearDown();
104     }
105
106     @Test
107     public void testHandleMessageForUnknownMessage() {
108         logStart("testHandleMessageForUnknownMessage");
109
110         leader = new Leader(createActorContext());
111
112         // handle message should null when it receives an unknown message
113         assertNull(leader.handleMessage(followerActor, "foo"));
114     }
115
116     @Test
117     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
119
120         MockRaftActorContext actorContext = createActorContextWithFollower();
121         actorContext.setCommitIndex(-1);
122         actorContext.setPayloadVersion(payloadVersion);
123
124         long term = 1;
125         actorContext.getTermInformation().update(term, "");
126
127         leader = new Leader(actorContext);
128         actorContext.setCurrentBehavior(leader);
129
130         // Leader should send an immediate heartbeat with no entries as follower is inactive.
131         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133         assertEquals("getTerm", term, appendEntries.getTerm());
134         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136         assertEquals("Entries size", 0, appendEntries.getEntries().size());
137         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
138
139         // The follower would normally reply - simulate that explicitly here.
140         leader.handleMessage(followerActor, new AppendEntriesReply(
141                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
143
144         followerActor.underlyingActor().clear();
145
146         // Sleep for the heartbeat interval so AppendEntries is sent.
147         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
149
150         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
151
152         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155         assertEquals("Entries size", 1, appendEntries.getEntries().size());
156         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
159     }
160
161
162     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163         return sendReplicate(actorContext, 1, index);
164     }
165
166     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
167             final long index) {
168         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
169     }
170
171     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
172             final Payload payload) {
173         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
174         actorContext.getReplicatedLog().append(newEntry);
175         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
176     }
177
178     @Test
179     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
180         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
181
182         MockRaftActorContext actorContext = createActorContextWithFollower();
183
184         long term = 1;
185         actorContext.getTermInformation().update(term, "");
186
187         leader = new Leader(actorContext);
188
189         // Leader will send an immediate heartbeat - ignore it.
190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191
192         // The follower would normally reply - simulate that explicitly here.
193         long lastIndex = actorContext.getReplicatedLog().lastIndex();
194         leader.handleMessage(followerActor, new AppendEntriesReply(
195                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
196         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197
198         followerActor.underlyingActor().clear();
199
200         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
201
202         // State should not change
203         assertTrue(raftBehavior instanceof Leader);
204
205         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
206         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
207         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
208         assertEquals("Entries size", 1, appendEntries.getEntries().size());
209         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
210         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
211         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
212         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
213     }
214
215     @Test
216     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
217         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
218
219         MockRaftActorContext actorContext = createActorContextWithFollower();
220         actorContext.setCommitIndex(-1);
221         actorContext.setLastApplied(-1);
222
223         // The raft context is initialized with a couple log entries. However the commitIndex
224         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
225         // committed and applied. Now it regains leadership with a higher term (2).
226         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
227         long newTerm = prevTerm + 1;
228         actorContext.getTermInformation().update(newTerm, "");
229
230         leader = new Leader(actorContext);
231         actorContext.setCurrentBehavior(leader);
232
233         // Leader will send an immediate heartbeat - ignore it.
234         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
235
236         // The follower replies with the leader's current last index and term, simulating that it is
237         // up to date with the leader.
238         long lastIndex = actorContext.getReplicatedLog().lastIndex();
239         leader.handleMessage(followerActor, new AppendEntriesReply(
240                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
241
242         // The commit index should not get updated even though consensus was reached. This is b/c the
243         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
244         // from previous terms by counting replicas".
245         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
246
247         followerActor.underlyingActor().clear();
248
249         // Now replicate a new entry with the new term 2.
250         long newIndex = lastIndex + 1;
251         sendReplicate(actorContext, newTerm, newIndex);
252
253         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
254         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
255         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
256         assertEquals("Entries size", 1, appendEntries.getEntries().size());
257         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
258         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
259         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
260
261         // The follower replies with success. The leader should now update the commit index to the new index
262         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
263         // prior entries are committed indirectly".
264         leader.handleMessage(followerActor, new AppendEntriesReply(
265                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
266
267         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
268     }
269
270     @Test
271     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
272         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
273
274         MockRaftActorContext actorContext = createActorContextWithFollower();
275         actorContext.setRaftPolicy(createRaftPolicy(true, true));
276
277         long term = 1;
278         actorContext.getTermInformation().update(term, "");
279
280         leader = new Leader(actorContext);
281
282         // Leader will send an immediate heartbeat - ignore it.
283         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
284
285         // The follower would normally reply - simulate that explicitly here.
286         long lastIndex = actorContext.getReplicatedLog().lastIndex();
287         leader.handleMessage(followerActor, new AppendEntriesReply(
288                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
289         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
290
291         followerActor.underlyingActor().clear();
292
293         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
294
295         // State should not change
296         assertTrue(raftBehavior instanceof Leader);
297
298         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
299         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
300         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
301         assertEquals("Entries size", 1, appendEntries.getEntries().size());
302         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
303         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
304         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
305         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
306     }
307
308     @Test
309     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
310         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
311
312         MockRaftActorContext actorContext = createActorContextWithFollower();
313         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
314             @Override
315             public FiniteDuration getHeartBeatInterval() {
316                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
317             }
318         });
319
320         long term = 1;
321         actorContext.getTermInformation().update(term, "");
322
323         leader = new Leader(actorContext);
324
325         // Leader will send an immediate heartbeat - ignore it.
326         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
327
328         // The follower would normally reply - simulate that explicitly here.
329         long lastIndex = actorContext.getReplicatedLog().lastIndex();
330         leader.handleMessage(followerActor, new AppendEntriesReply(
331                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
332         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
333
334         followerActor.underlyingActor().clear();
335
336         for (int i = 0; i < 5; i++) {
337             sendReplicate(actorContext, lastIndex + i + 1);
338         }
339
340         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341         // We expect only 1 message to be sent because of two reasons,
342         // - an append entries reply was not received
343         // - the heartbeat interval has not expired
344         // In this scenario if multiple messages are sent they would likely be duplicates
345         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
346     }
347
348     @Test
349     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
350         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
351
352         MockRaftActorContext actorContext = createActorContextWithFollower();
353         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
354             @Override
355             public FiniteDuration getHeartBeatInterval() {
356                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
357             }
358         });
359
360         long term = 1;
361         actorContext.getTermInformation().update(term, "");
362
363         leader = new Leader(actorContext);
364
365         // Leader will send an immediate heartbeat - ignore it.
366         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
367
368         // The follower would normally reply - simulate that explicitly here.
369         long lastIndex = actorContext.getReplicatedLog().lastIndex();
370         leader.handleMessage(followerActor, new AppendEntriesReply(
371                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
372         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
373
374         followerActor.underlyingActor().clear();
375
376         for (int i = 0; i < 3; i++) {
377             sendReplicate(actorContext, lastIndex + i + 1);
378             leader.handleMessage(followerActor, new AppendEntriesReply(
379                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
380         }
381
382         // We are expecting six messages here -- a request to replicate and a consensus-reached message
383         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
384         assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
385         for (int i = 0; i < 3; i++) {
386             assertRequestEntry(lastIndex, allMessages, i);
387             assertCommitEntry(lastIndex, allMessages, i);
388         }
389
390         // Now perform another commit, eliciting a request to persist
391         sendReplicate(actorContext, lastIndex + 3 + 1);
392         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393         // This elicits another message for request to replicate
394         assertEquals("The number of request entries collected", 7, allMessages.size());
395         assertRequestEntry(lastIndex, allMessages, 3);
396
397         sendReplicate(actorContext, lastIndex + 4 + 1);
398         allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
399         assertEquals("The number of request entries collected", 7, allMessages.size());
400     }
401
402     private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
403             final int messageNr) {
404         final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
405         assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
406         assertEquals(ImmutableList.of(), commitReq.getEntries());
407     }
408
409     private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
410             final int messageNr) {
411         final AppendEntries req = allMessages.get(2 * messageNr);
412         assertEquals(lastIndex + messageNr, req.getLeaderCommit());
413
414         final List<ReplicatedLogEntry> entries = req.getEntries();
415         assertEquals(1, entries.size());
416         assertEquals(messageNr + 2, entries.get(0).getIndex());
417     }
418
419     @Test
420     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
421         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
422
423         MockRaftActorContext actorContext = createActorContextWithFollower();
424         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
425             @Override
426             public FiniteDuration getHeartBeatInterval() {
427                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
428             }
429         });
430
431         long term = 1;
432         actorContext.getTermInformation().update(term, "");
433
434         leader = new Leader(actorContext);
435
436         // Leader will send an immediate heartbeat - ignore it.
437         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
438
439         // The follower would normally reply - simulate that explicitly here.
440         long lastIndex = actorContext.getReplicatedLog().lastIndex();
441         leader.handleMessage(followerActor, new AppendEntriesReply(
442                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
443         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
444
445         followerActor.underlyingActor().clear();
446
447         sendReplicate(actorContext, lastIndex + 1);
448
449         // Wait slightly longer than heartbeat duration
450         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
451
452         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
453
454         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
455         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
456
457         assertEquals(1, allMessages.get(0).getEntries().size());
458         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
459         assertEquals(1, allMessages.get(1).getEntries().size());
460         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
461
462     }
463
464     @Test
465     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
466         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
467
468         MockRaftActorContext actorContext = createActorContextWithFollower();
469         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470             @Override
471             public FiniteDuration getHeartBeatInterval() {
472                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
473             }
474         });
475
476         long term = 1;
477         actorContext.getTermInformation().update(term, "");
478
479         leader = new Leader(actorContext);
480
481         // Leader will send an immediate heartbeat - ignore it.
482         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483
484         // The follower would normally reply - simulate that explicitly here.
485         long lastIndex = actorContext.getReplicatedLog().lastIndex();
486         leader.handleMessage(followerActor, new AppendEntriesReply(
487                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489
490         followerActor.underlyingActor().clear();
491
492         for (int i = 0; i < 3; i++) {
493             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
494             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
495         }
496
497         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
498         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
499     }
500
501     @Test
502     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
503         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
504
505         MockRaftActorContext actorContext = createActorContextWithFollower();
506         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
507             @Override
508             public FiniteDuration getHeartBeatInterval() {
509                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
510             }
511         });
512
513         long term = 1;
514         actorContext.getTermInformation().update(term, "");
515
516         leader = new Leader(actorContext);
517
518         // Leader will send an immediate heartbeat - ignore it.
519         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
520
521         // The follower would normally reply - simulate that explicitly here.
522         long lastIndex = actorContext.getReplicatedLog().lastIndex();
523         leader.handleMessage(followerActor, new AppendEntriesReply(
524                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
525         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
526
527         followerActor.underlyingActor().clear();
528
529         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
530         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
531         sendReplicate(actorContext, lastIndex + 1);
532
533         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
534         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
535
536         assertEquals(0, allMessages.get(0).getEntries().size());
537         assertEquals(1, allMessages.get(1).getEntries().size());
538     }
539
540
541     @Test
542     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
543         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
544
545         MockRaftActorContext actorContext = createActorContext();
546
547         leader = new Leader(actorContext);
548
549         actorContext.setLastApplied(0);
550
551         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
552         long term = actorContext.getTermInformation().getCurrentTerm();
553         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
554                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
555
556         actorContext.getReplicatedLog().append(newEntry);
557
558         final Identifier id = new MockIdentifier("state-id");
559         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
560                 new Replicate(leaderActor, id, newEntry, true));
561
562         // State should not change
563         assertTrue(raftBehavior instanceof Leader);
564
565         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
566
567         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
568         // one since lastApplied state is 0.
569         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
570                 leaderActor, ApplyState.class);
571         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
572
573         for (int i = 0; i <= newLogIndex - 1; i++) {
574             ApplyState applyState = applyStateList.get(i);
575             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
576             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
577         }
578
579         ApplyState last = applyStateList.get((int) newLogIndex - 1);
580         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
581         assertEquals("getIdentifier", id, last.getIdentifier());
582     }
583
584     @Test
585     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
586         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
587
588         final MockRaftActorContext actorContext = createActorContextWithFollower();
589
590         Map<String, String> leadersSnapshot = new HashMap<>();
591         leadersSnapshot.put("1", "A");
592         leadersSnapshot.put("2", "B");
593         leadersSnapshot.put("3", "C");
594
595         //clears leaders log
596         actorContext.getReplicatedLog().removeFrom(0);
597
598         final int commitIndex = 3;
599         final int snapshotIndex = 2;
600         final int snapshotTerm = 1;
601
602         // set the snapshot variables in replicatedlog
603         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
604         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
605         actorContext.setCommitIndex(commitIndex);
606         //set follower timeout to 2 mins, helps during debugging
607         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
608
609         leader = new Leader(actorContext);
610
611         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
612         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
613
614         //update follower timestamp
615         leader.markFollowerActive(FOLLOWER_ID);
616
617         ByteString bs = toByteString(leadersSnapshot);
618         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
619                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
620                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
621         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
622                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
623         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
624         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
625
626         //send first chunk and no InstallSnapshotReply received yet
627         fts.getNextChunk();
628         fts.incrementChunkIndex();
629
630         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
631                 TimeUnit.MILLISECONDS);
632
633         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
634
635         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
636
637         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
638
639         //InstallSnapshotReply received
640         fts.markSendStatus(true);
641
642         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
643
644         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
645
646         assertEquals(commitIndex, is.getLastIncludedIndex());
647     }
648
649     @Test
650     public void testSendAppendEntriesSnapshotScenario() {
651         logStart("testSendAppendEntriesSnapshotScenario");
652
653         final MockRaftActorContext actorContext = createActorContextWithFollower();
654
655         Map<String, String> leadersSnapshot = new HashMap<>();
656         leadersSnapshot.put("1", "A");
657         leadersSnapshot.put("2", "B");
658         leadersSnapshot.put("3", "C");
659
660         //clears leaders log
661         actorContext.getReplicatedLog().removeFrom(0);
662
663         final int followersLastIndex = 2;
664         final int snapshotIndex = 3;
665         final int newEntryIndex = 4;
666         final int snapshotTerm = 1;
667         final int currentTerm = 2;
668
669         // set the snapshot variables in replicatedlog
670         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
671         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
672         actorContext.setCommitIndex(followersLastIndex);
673
674         leader = new Leader(actorContext);
675
676         // Leader will send an immediate heartbeat - ignore it.
677         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
678
679         // new entry
680         SimpleReplicatedLogEntry entry =
681                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
682                         new MockRaftActorContext.MockPayload("D"));
683
684         actorContext.getReplicatedLog().append(entry);
685
686         //update follower timestamp
687         leader.markFollowerActive(FOLLOWER_ID);
688
689         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
690         RaftActorBehavior raftBehavior = leader.handleMessage(
691                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
692
693         assertTrue(raftBehavior instanceof Leader);
694
695         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
696     }
697
698     @Test
699     public void testInitiateInstallSnapshot() {
700         logStart("testInitiateInstallSnapshot");
701
702         MockRaftActorContext actorContext = createActorContextWithFollower();
703
704         //clears leaders log
705         actorContext.getReplicatedLog().removeFrom(0);
706
707         final int followersLastIndex = 2;
708         final int snapshotIndex = 3;
709         final int newEntryIndex = 4;
710         final int snapshotTerm = 1;
711         final int currentTerm = 2;
712
713         // set the snapshot variables in replicatedlog
714         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
715         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
716         actorContext.setLastApplied(3);
717         actorContext.setCommitIndex(followersLastIndex);
718
719         leader = new Leader(actorContext);
720
721         // Leader will send an immediate heartbeat - ignore it.
722         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
723
724         // set the snapshot as absent and check if capture-snapshot is invoked.
725         leader.setSnapshotHolder(null);
726
727         // new entry
728         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
729                 new MockRaftActorContext.MockPayload("D"));
730
731         actorContext.getReplicatedLog().append(entry);
732
733         //update follower timestamp
734         leader.markFollowerActive(FOLLOWER_ID);
735
736         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
737
738         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
739
740         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
741
742         assertEquals(3, cs.getLastAppliedIndex());
743         assertEquals(1, cs.getLastAppliedTerm());
744         assertEquals(4, cs.getLastIndex());
745         assertEquals(2, cs.getLastTerm());
746
747         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
748         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
749
750         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
751     }
752
753     @Test
754     public void testInitiateForceInstallSnapshot() throws Exception {
755         logStart("testInitiateForceInstallSnapshot");
756
757         MockRaftActorContext actorContext = createActorContextWithFollower();
758
759         final int followersLastIndex = 2;
760         final int snapshotIndex = -1;
761         final int newEntryIndex = 4;
762         final int snapshotTerm = -1;
763         final int currentTerm = 2;
764
765         // set the snapshot variables in replicatedlog
766         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
767         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
768         actorContext.setLastApplied(3);
769         actorContext.setCommitIndex(followersLastIndex);
770
771         actorContext.getReplicatedLog().removeFrom(0);
772
773         AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
774         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
775
776         leader = new Leader(actorContext);
777         actorContext.setCurrentBehavior(leader);
778
779         // Leader will send an immediate heartbeat - ignore it.
780         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
781
782         // set the snapshot as absent and check if capture-snapshot is invoked.
783         leader.setSnapshotHolder(null);
784
785         for (int i = 0; i < 4; i++) {
786             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
787                     new MockRaftActorContext.MockPayload("X" + i)));
788         }
789
790         // new entry
791         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
792                 new MockRaftActorContext.MockPayload("D"));
793
794         actorContext.getReplicatedLog().append(entry);
795
796         //update follower timestamp
797         leader.markFollowerActive(FOLLOWER_ID);
798
799         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
800         // installed with a SendInstallSnapshot
801         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
802                 RaftVersions.CURRENT_VERSION));
803
804         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
805
806         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
807         assertEquals(3, cs.getLastAppliedIndex());
808         assertEquals(1, cs.getLastAppliedTerm());
809         assertEquals(4, cs.getLastIndex());
810         assertEquals(2, cs.getLastTerm());
811
812         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
813         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
814
815         MessageCollectorActor.clearMessages(followerActor);
816
817         // Sending Replicate message should not initiate another capture since the first is in progress.
818         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
819         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
820
821         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
822         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
823                 RaftVersions.CURRENT_VERSION));
824         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
825
826         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
827         final byte[] bytes = new byte[]{1, 2, 3};
828         installSnapshotStream.get().get().write(bytes);
829         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
830                 Runtime.getRuntime().totalMemory());
831         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
832
833         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
834         MessageCollectorActor.clearMessages(followerActor);
835         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
836                 RaftVersions.CURRENT_VERSION));
837         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
838     }
839
840
841     @Test
842     public void testInstallSnapshot() {
843         logStart("testInstallSnapshot");
844
845         final MockRaftActorContext actorContext = createActorContextWithFollower();
846
847         Map<String, String> leadersSnapshot = new HashMap<>();
848         leadersSnapshot.put("1", "A");
849         leadersSnapshot.put("2", "B");
850         leadersSnapshot.put("3", "C");
851
852         //clears leaders log
853         actorContext.getReplicatedLog().removeFrom(0);
854
855         final int lastAppliedIndex = 3;
856         final int snapshotIndex = 2;
857         final int snapshotTerm = 1;
858         final int currentTerm = 2;
859
860         // set the snapshot variables in replicatedlog
861         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
862         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
863         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
864         actorContext.setCommitIndex(lastAppliedIndex);
865         actorContext.setLastApplied(lastAppliedIndex);
866
867         leader = new Leader(actorContext);
868
869         // Initial heartbeat.
870         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
871
872         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
873         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
874
875         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
876         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
877                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
878
879         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
880                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
881
882         assertTrue(raftBehavior instanceof Leader);
883
884         // check if installsnapshot gets called with the correct values.
885
886         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
887                 InstallSnapshot.class);
888
889         assertNotNull(installSnapshot.getData());
890         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
891         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
892
893         assertEquals(currentTerm, installSnapshot.getTerm());
894     }
895
896     @Test
897     public void testForceInstallSnapshot() {
898         logStart("testForceInstallSnapshot");
899
900         final MockRaftActorContext actorContext = createActorContextWithFollower();
901
902         Map<String, String> leadersSnapshot = new HashMap<>();
903         leadersSnapshot.put("1", "A");
904         leadersSnapshot.put("2", "B");
905         leadersSnapshot.put("3", "C");
906
907         final int lastAppliedIndex = 3;
908         final int snapshotIndex = -1;
909         final int snapshotTerm = -1;
910         final int currentTerm = 2;
911
912         // set the snapshot variables in replicatedlog
913         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916         actorContext.setCommitIndex(lastAppliedIndex);
917         actorContext.setLastApplied(lastAppliedIndex);
918
919         leader = new Leader(actorContext);
920
921         // Initial heartbeat.
922         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
923
924         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
925         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
926
927         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
928         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
929                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
930
931         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
932                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
933
934         assertTrue(raftBehavior instanceof Leader);
935
936         // check if installsnapshot gets called with the correct values.
937
938         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
939                 InstallSnapshot.class);
940
941         assertNotNull(installSnapshot.getData());
942         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
943         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
944
945         assertEquals(currentTerm, installSnapshot.getTerm());
946     }
947
948     @Test
949     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
950         logStart("testHandleInstallSnapshotReplyLastChunk");
951
952         MockRaftActorContext actorContext = createActorContextWithFollower();
953
954         final int commitIndex = 3;
955         final int snapshotIndex = 2;
956         final int snapshotTerm = 1;
957         final int currentTerm = 2;
958
959         actorContext.setCommitIndex(commitIndex);
960
961         leader = new Leader(actorContext);
962         actorContext.setCurrentBehavior(leader);
963
964         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
965         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
966
967         // Ignore initial heartbeat.
968         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
969
970         Map<String, String> leadersSnapshot = new HashMap<>();
971         leadersSnapshot.put("1", "A");
972         leadersSnapshot.put("2", "B");
973         leadersSnapshot.put("3", "C");
974
975         // set the snapshot variables in replicatedlog
976
977         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
978         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
979         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
980
981         ByteString bs = toByteString(leadersSnapshot);
982         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
983                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
984                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
985         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
986                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
987         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
988         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
989         while (!fts.isLastChunk(fts.getChunkIndex())) {
990             fts.getNextChunk();
991             fts.incrementChunkIndex();
992         }
993
994         //clears leaders log
995         actorContext.getReplicatedLog().removeFrom(0);
996
997         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
998                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
999
1000         assertTrue(raftBehavior instanceof Leader);
1001
1002         assertEquals(1, leader.followerLogSize());
1003         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
1004         assertNotNull(fli);
1005         assertNull(fli.getInstallSnapshotState());
1006         assertEquals(commitIndex, fli.getMatchIndex());
1007         assertEquals(commitIndex + 1, fli.getNextIndex());
1008         assertFalse(leader.hasSnapshot());
1009     }
1010
1011     @Test
1012     public void testSendSnapshotfromInstallSnapshotReply() {
1013         logStart("testSendSnapshotfromInstallSnapshotReply");
1014
1015         MockRaftActorContext actorContext = createActorContextWithFollower();
1016
1017         final int commitIndex = 3;
1018         final int snapshotIndex = 2;
1019         final int snapshotTerm = 1;
1020         final int currentTerm = 2;
1021
1022         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1023             @Override
1024             public int getSnapshotChunkSize() {
1025                 return 50;
1026             }
1027         };
1028         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1029         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1030
1031         actorContext.setConfigParams(configParams);
1032         actorContext.setCommitIndex(commitIndex);
1033
1034         leader = new Leader(actorContext);
1035         actorContext.setCurrentBehavior(leader);
1036
1037         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1038         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1039
1040         Map<String, String> leadersSnapshot = new HashMap<>();
1041         leadersSnapshot.put("1", "A");
1042         leadersSnapshot.put("2", "B");
1043         leadersSnapshot.put("3", "C");
1044
1045         // set the snapshot variables in replicatedlog
1046         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1047         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1048         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1049
1050         ByteString bs = toByteString(leadersSnapshot);
1051         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1052                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1053                 -1, null, null);
1054
1055         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1056
1057         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1058                 InstallSnapshot.class);
1059
1060         assertEquals(1, installSnapshot.getChunkIndex());
1061         assertEquals(3, installSnapshot.getTotalChunks());
1062
1063         followerActor.underlyingActor().clear();
1064         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1065                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1066
1067         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1068
1069         assertEquals(2, installSnapshot.getChunkIndex());
1070         assertEquals(3, installSnapshot.getTotalChunks());
1071
1072         followerActor.underlyingActor().clear();
1073         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1074                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1075
1076         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1077
1078         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1079         followerActor.underlyingActor().clear();
1080         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1081                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1082
1083         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1084
1085         assertNull(installSnapshot);
1086     }
1087
1088
1089     @Test
1090     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1091         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1092
1093         MockRaftActorContext actorContext = createActorContextWithFollower();
1094
1095         final int commitIndex = 3;
1096         final int snapshotIndex = 2;
1097         final int snapshotTerm = 1;
1098         final int currentTerm = 2;
1099
1100         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1101             @Override
1102             public int getSnapshotChunkSize() {
1103                 return 50;
1104             }
1105         });
1106
1107         actorContext.setCommitIndex(commitIndex);
1108
1109         leader = new Leader(actorContext);
1110
1111         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1112         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1113
1114         Map<String, String> leadersSnapshot = new HashMap<>();
1115         leadersSnapshot.put("1", "A");
1116         leadersSnapshot.put("2", "B");
1117         leadersSnapshot.put("3", "C");
1118
1119         // set the snapshot variables in replicatedlog
1120         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1121         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1122         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1123
1124         ByteString bs = toByteString(leadersSnapshot);
1125         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1126                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1127                 -1, null, null);
1128
1129         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1130         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1131
1132         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1133                 InstallSnapshot.class);
1134
1135         assertEquals(1, installSnapshot.getChunkIndex());
1136         assertEquals(3, installSnapshot.getTotalChunks());
1137
1138         followerActor.underlyingActor().clear();
1139
1140         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1141                 FOLLOWER_ID, -1, false));
1142
1143         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1144                 TimeUnit.MILLISECONDS);
1145
1146         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1147
1148         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1149
1150         assertEquals(1, installSnapshot.getChunkIndex());
1151         assertEquals(3, installSnapshot.getTotalChunks());
1152     }
1153
1154     @Test
1155     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1156         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1157
1158         MockRaftActorContext actorContext = createActorContextWithFollower();
1159
1160         final int commitIndex = 3;
1161         final int snapshotIndex = 2;
1162         final int snapshotTerm = 1;
1163         final int currentTerm = 2;
1164
1165         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1166             @Override
1167             public int getSnapshotChunkSize() {
1168                 return 50;
1169             }
1170         });
1171
1172         actorContext.setCommitIndex(commitIndex);
1173
1174         leader = new Leader(actorContext);
1175
1176         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1177         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1178
1179         Map<String, String> leadersSnapshot = new HashMap<>();
1180         leadersSnapshot.put("1", "A");
1181         leadersSnapshot.put("2", "B");
1182         leadersSnapshot.put("3", "C");
1183
1184         // set the snapshot variables in replicatedlog
1185         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1186         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1187         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1188
1189         ByteString bs = toByteString(leadersSnapshot);
1190         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1191                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1192                 -1, null, null);
1193
1194         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1195
1196         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1197                 InstallSnapshot.class);
1198
1199         assertEquals(1, installSnapshot.getChunkIndex());
1200         assertEquals(3, installSnapshot.getTotalChunks());
1201         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1202                 installSnapshot.getLastChunkHashCode().getAsInt());
1203
1204         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1205
1206         followerActor.underlyingActor().clear();
1207
1208         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1209                 FOLLOWER_ID, 1, true));
1210
1211         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1212
1213         assertEquals(2, installSnapshot.getChunkIndex());
1214         assertEquals(3, installSnapshot.getTotalChunks());
1215         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
1216     }
1217
1218     @Test
1219     public void testLeaderInstallSnapshotState() throws IOException {
1220         logStart("testLeaderInstallSnapshotState");
1221
1222         Map<String, String> leadersSnapshot = new HashMap<>();
1223         leadersSnapshot.put("1", "A");
1224         leadersSnapshot.put("2", "B");
1225         leadersSnapshot.put("3", "C");
1226
1227         ByteString bs = toByteString(leadersSnapshot);
1228         byte[] barray = bs.toByteArray();
1229
1230         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1231         fts.setSnapshotBytes(ByteSource.wrap(barray));
1232
1233         assertEquals(bs.size(), barray.length);
1234
1235         int chunkIndex = 0;
1236         for (int i = 0; i < barray.length; i = i + 50) {
1237             int length = i + 50;
1238             chunkIndex++;
1239
1240             if (i + 50 > barray.length) {
1241                 length = barray.length;
1242             }
1243
1244             byte[] chunk = fts.getNextChunk();
1245             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1246             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1247
1248             fts.markSendStatus(true);
1249             if (!fts.isLastChunk(chunkIndex)) {
1250                 fts.incrementChunkIndex();
1251             }
1252         }
1253
1254         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1255         fts.close();
1256     }
1257
1258     @Override
1259     protected Leader createBehavior(final RaftActorContext actorContext) {
1260         return new Leader(actorContext);
1261     }
1262
1263     @Override
1264     protected MockRaftActorContext createActorContext() {
1265         return createActorContext(leaderActor);
1266     }
1267
1268     @Override
1269     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1270         return createActorContext(LEADER_ID, actorRef);
1271     }
1272
1273     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1274         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1275         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1276         configParams.setElectionTimeoutFactor(100000);
1277         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1278         context.setConfigParams(configParams);
1279         context.setPayloadVersion(payloadVersion);
1280         return context;
1281     }
1282
1283     private MockRaftActorContext createActorContextWithFollower() {
1284         MockRaftActorContext actorContext = createActorContext();
1285         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1286                 followerActor.path().toString()).build());
1287         return actorContext;
1288     }
1289
1290     private MockRaftActorContext createFollowerActorContextWithLeader() {
1291         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1292         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1293         followerConfig.setElectionTimeoutFactor(10000);
1294         followerActorContext.setConfigParams(followerConfig);
1295         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296         return followerActorContext;
1297     }
1298
1299     @Test
1300     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1301         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1302
1303         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1304
1305         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1306
1307         Follower follower = new Follower(followerActorContext);
1308         followerActor.underlyingActor().setBehavior(follower);
1309         followerActorContext.setCurrentBehavior(follower);
1310
1311         Map<String, String> peerAddresses = new HashMap<>();
1312         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1313
1314         leaderActorContext.setPeerAddresses(peerAddresses);
1315
1316         leaderActorContext.getReplicatedLog().removeFrom(0);
1317
1318         //create 3 entries
1319         leaderActorContext.setReplicatedLog(
1320                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1321
1322         leaderActorContext.setCommitIndex(1);
1323
1324         followerActorContext.getReplicatedLog().removeFrom(0);
1325
1326         // follower too has the exact same log entries and has the same commit index
1327         followerActorContext.setReplicatedLog(
1328                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1329
1330         followerActorContext.setCommitIndex(1);
1331
1332         leader = new Leader(leaderActorContext);
1333         leaderActorContext.setCurrentBehavior(leader);
1334
1335         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1336
1337         assertEquals(-1, appendEntries.getLeaderCommit());
1338         assertEquals(0, appendEntries.getEntries().size());
1339         assertEquals(0, appendEntries.getPrevLogIndex());
1340
1341         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1342                 leaderActor, AppendEntriesReply.class);
1343
1344         assertEquals(2, appendEntriesReply.getLogLastIndex());
1345         assertEquals(1, appendEntriesReply.getLogLastTerm());
1346
1347         // follower returns its next index
1348         assertEquals(2, appendEntriesReply.getLogLastIndex());
1349         assertEquals(1, appendEntriesReply.getLogLastTerm());
1350
1351         follower.close();
1352     }
1353
1354     @Test
1355     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1356         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1357
1358         final MockRaftActorContext leaderActorContext = createActorContext();
1359
1360         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1361         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1362
1363         Follower follower = new Follower(followerActorContext);
1364         followerActor.underlyingActor().setBehavior(follower);
1365         followerActorContext.setCurrentBehavior(follower);
1366
1367         Map<String, String> leaderPeerAddresses = new HashMap<>();
1368         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1369
1370         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1371
1372         leaderActorContext.getReplicatedLog().removeFrom(0);
1373
1374         leaderActorContext.setReplicatedLog(
1375                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1376
1377         leaderActorContext.setCommitIndex(1);
1378
1379         followerActorContext.getReplicatedLog().removeFrom(0);
1380
1381         followerActorContext.setReplicatedLog(
1382                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1383
1384         // follower has the same log entries but its commit index > leaders commit index
1385         followerActorContext.setCommitIndex(2);
1386
1387         leader = new Leader(leaderActorContext);
1388
1389         // Initial heartbeat
1390         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1391
1392         assertEquals(-1, appendEntries.getLeaderCommit());
1393         assertEquals(0, appendEntries.getEntries().size());
1394         assertEquals(0, appendEntries.getPrevLogIndex());
1395
1396         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1397                 leaderActor, AppendEntriesReply.class);
1398
1399         assertEquals(2, appendEntriesReply.getLogLastIndex());
1400         assertEquals(1, appendEntriesReply.getLogLastTerm());
1401
1402         leaderActor.underlyingActor().setBehavior(follower);
1403         leader.handleMessage(followerActor, appendEntriesReply);
1404
1405         leaderActor.underlyingActor().clear();
1406         followerActor.underlyingActor().clear();
1407
1408         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1409                 TimeUnit.MILLISECONDS);
1410
1411         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1412
1413         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1414
1415         assertEquals(2, appendEntries.getLeaderCommit());
1416         assertEquals(0, appendEntries.getEntries().size());
1417         assertEquals(2, appendEntries.getPrevLogIndex());
1418
1419         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1420
1421         assertEquals(2, appendEntriesReply.getLogLastIndex());
1422         assertEquals(1, appendEntriesReply.getLogLastTerm());
1423
1424         assertEquals(2, followerActorContext.getCommitIndex());
1425
1426         follower.close();
1427     }
1428
1429     @Test
1430     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1431         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1432
1433         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1434         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1435                 new FiniteDuration(1000, TimeUnit.SECONDS));
1436
1437         leaderActorContext.setReplicatedLog(
1438                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1439         long leaderCommitIndex = 2;
1440         leaderActorContext.setCommitIndex(leaderCommitIndex);
1441         leaderActorContext.setLastApplied(leaderCommitIndex);
1442
1443         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1444         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1445
1446         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1447
1448         followerActorContext.setReplicatedLog(
1449                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1450         followerActorContext.setCommitIndex(0);
1451         followerActorContext.setLastApplied(0);
1452
1453         Follower follower = new Follower(followerActorContext);
1454         followerActor.underlyingActor().setBehavior(follower);
1455
1456         leader = new Leader(leaderActorContext);
1457
1458         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1459         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1460                 AppendEntriesReply.class);
1461
1462         MessageCollectorActor.clearMessages(followerActor);
1463         MessageCollectorActor.clearMessages(leaderActor);
1464
1465         // Verify initial AppendEntries sent.
1466         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1467         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1468         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1469
1470         leaderActor.underlyingActor().setBehavior(leader);
1471
1472         leader.handleMessage(followerActor, appendEntriesReply);
1473
1474         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1475         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1476
1477         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1478         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1479         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1480
1481         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1482         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1483                 appendEntries.getEntries().get(0).getData());
1484         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1485         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1486                 appendEntries.getEntries().get(1).getData());
1487
1488         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1489         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1490
1491         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1492
1493         ApplyState applyState = applyStateList.get(0);
1494         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1495         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1496         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1497                 applyState.getReplicatedLogEntry().getData());
1498
1499         applyState = applyStateList.get(1);
1500         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1501         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1502         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1503                 applyState.getReplicatedLogEntry().getData());
1504
1505         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1506         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1507     }
1508
1509     @Test
1510     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1511         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1512
1513         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1514         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1515                 new FiniteDuration(1000, TimeUnit.SECONDS));
1516
1517         leaderActorContext.setReplicatedLog(
1518                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1519         long leaderCommitIndex = 1;
1520         leaderActorContext.setCommitIndex(leaderCommitIndex);
1521         leaderActorContext.setLastApplied(leaderCommitIndex);
1522
1523         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1524         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1525
1526         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1527
1528         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1529         followerActorContext.setCommitIndex(-1);
1530         followerActorContext.setLastApplied(-1);
1531
1532         Follower follower = new Follower(followerActorContext);
1533         followerActor.underlyingActor().setBehavior(follower);
1534         followerActorContext.setCurrentBehavior(follower);
1535
1536         leader = new Leader(leaderActorContext);
1537
1538         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1539         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1540                 AppendEntriesReply.class);
1541
1542         MessageCollectorActor.clearMessages(followerActor);
1543         MessageCollectorActor.clearMessages(leaderActor);
1544
1545         // Verify initial AppendEntries sent with the leader's current commit index.
1546         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1547         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1548         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1549
1550         leaderActor.underlyingActor().setBehavior(leader);
1551         leaderActorContext.setCurrentBehavior(leader);
1552
1553         leader.handleMessage(followerActor, appendEntriesReply);
1554
1555         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1556         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1557
1558         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1559         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1560         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1561
1562         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1563         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1564                 appendEntries.getEntries().get(0).getData());
1565         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1566         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1567                 appendEntries.getEntries().get(1).getData());
1568
1569         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1570         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1571
1572         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1573
1574         ApplyState applyState = applyStateList.get(0);
1575         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1576         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1577         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1578                 applyState.getReplicatedLogEntry().getData());
1579
1580         applyState = applyStateList.get(1);
1581         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1582         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1583         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1584                 applyState.getReplicatedLogEntry().getData());
1585
1586         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1587         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1588     }
1589
1590     @Test
1591     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1592         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1593
1594         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1595         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1596                 new FiniteDuration(1000, TimeUnit.SECONDS));
1597
1598         leaderActorContext.setReplicatedLog(
1599                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1600         long leaderCommitIndex = 1;
1601         leaderActorContext.setCommitIndex(leaderCommitIndex);
1602         leaderActorContext.setLastApplied(leaderCommitIndex);
1603
1604         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1605         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1606
1607         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1608
1609         followerActorContext.setReplicatedLog(
1610                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1611         followerActorContext.setCommitIndex(-1);
1612         followerActorContext.setLastApplied(-1);
1613
1614         Follower follower = new Follower(followerActorContext);
1615         followerActor.underlyingActor().setBehavior(follower);
1616         followerActorContext.setCurrentBehavior(follower);
1617
1618         leader = new Leader(leaderActorContext);
1619
1620         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1621         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1622                 AppendEntriesReply.class);
1623
1624         MessageCollectorActor.clearMessages(followerActor);
1625         MessageCollectorActor.clearMessages(leaderActor);
1626
1627         // Verify initial AppendEntries sent with the leader's current commit index.
1628         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1629         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1630         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1631
1632         leaderActor.underlyingActor().setBehavior(leader);
1633         leaderActorContext.setCurrentBehavior(leader);
1634
1635         leader.handleMessage(followerActor, appendEntriesReply);
1636
1637         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1638         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1639
1640         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1641         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1642         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1643
1644         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1645         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1646         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1647                 appendEntries.getEntries().get(0).getData());
1648         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1649         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1650         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1651                 appendEntries.getEntries().get(1).getData());
1652
1653         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1654         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1655
1656         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1657
1658         ApplyState applyState = applyStateList.get(0);
1659         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1660         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1661         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1662                 applyState.getReplicatedLogEntry().getData());
1663
1664         applyState = applyStateList.get(1);
1665         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1666         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1667         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1668                 applyState.getReplicatedLogEntry().getData());
1669
1670         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1671         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1672         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1673     }
1674
1675     @Test
1676     public void testHandleAppendEntriesReplyWithNewerTerm() {
1677         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1678
1679         MockRaftActorContext leaderActorContext = createActorContext();
1680         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1681                 new FiniteDuration(10000, TimeUnit.SECONDS));
1682
1683         leaderActorContext.setReplicatedLog(
1684                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1685
1686         leader = new Leader(leaderActorContext);
1687         leaderActor.underlyingActor().setBehavior(leader);
1688         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1689
1690         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1691                 AppendEntriesReply.class);
1692
1693         assertEquals(false, appendEntriesReply.isSuccess());
1694         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1695
1696         MessageCollectorActor.clearMessages(leaderActor);
1697     }
1698
1699     @Test
1700     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1701         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1702
1703         MockRaftActorContext leaderActorContext = createActorContext();
1704         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1705                 new FiniteDuration(10000, TimeUnit.SECONDS));
1706
1707         leaderActorContext.setReplicatedLog(
1708                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1709         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1710
1711         leader = new Leader(leaderActorContext);
1712         leaderActor.underlyingActor().setBehavior(leader);
1713         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1714
1715         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1716                 AppendEntriesReply.class);
1717
1718         assertEquals(false, appendEntriesReply.isSuccess());
1719         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1720
1721         MessageCollectorActor.clearMessages(leaderActor);
1722     }
1723
1724     @Test
1725     public void testHandleAppendEntriesReplySuccess() {
1726         logStart("testHandleAppendEntriesReplySuccess");
1727
1728         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1729
1730         leaderActorContext.setReplicatedLog(
1731                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1732
1733         leaderActorContext.setCommitIndex(1);
1734         leaderActorContext.setLastApplied(1);
1735         leaderActorContext.getTermInformation().update(1, "leader");
1736
1737         leader = new Leader(leaderActorContext);
1738
1739         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1740
1741         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1742         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1743
1744         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1745
1746         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1747
1748         assertEquals(RaftState.Leader, raftActorBehavior.state());
1749
1750         assertEquals(2, leaderActorContext.getCommitIndex());
1751
1752         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1753                 leaderActor, ApplyJournalEntries.class);
1754
1755         assertEquals(2, leaderActorContext.getLastApplied());
1756
1757         assertEquals(2, applyJournalEntries.getToIndex());
1758
1759         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1760                 ApplyState.class);
1761
1762         assertEquals(1,applyStateList.size());
1763
1764         ApplyState applyState = applyStateList.get(0);
1765
1766         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1767
1768         assertEquals(2, followerInfo.getMatchIndex());
1769         assertEquals(3, followerInfo.getNextIndex());
1770         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1771         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1772     }
1773
1774     @Test
1775     public void testHandleAppendEntriesReplyUnknownFollower() {
1776         logStart("testHandleAppendEntriesReplyUnknownFollower");
1777
1778         MockRaftActorContext leaderActorContext = createActorContext();
1779
1780         leader = new Leader(leaderActorContext);
1781
1782         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1783
1784         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1785
1786         assertEquals(RaftState.Leader, raftActorBehavior.state());
1787     }
1788
1789     @Test
1790     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1791         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1792
1793         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1794         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1795                 new FiniteDuration(1000, TimeUnit.SECONDS));
1796         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1797
1798         leaderActorContext.setReplicatedLog(
1799                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1800         long leaderCommitIndex = 3;
1801         leaderActorContext.setCommitIndex(leaderCommitIndex);
1802         leaderActorContext.setLastApplied(leaderCommitIndex);
1803
1804         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1805         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1806         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1807         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1808
1809         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1810
1811         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1812         followerActorContext.setCommitIndex(-1);
1813         followerActorContext.setLastApplied(-1);
1814
1815         Follower follower = new Follower(followerActorContext);
1816         followerActor.underlyingActor().setBehavior(follower);
1817         followerActorContext.setCurrentBehavior(follower);
1818
1819         leader = new Leader(leaderActorContext);
1820
1821         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1822         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1823                 AppendEntriesReply.class);
1824
1825         MessageCollectorActor.clearMessages(followerActor);
1826         MessageCollectorActor.clearMessages(leaderActor);
1827
1828         // Verify initial AppendEntries sent with the leader's current commit index.
1829         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1830         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1831         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1832
1833         leaderActor.underlyingActor().setBehavior(leader);
1834         leaderActorContext.setCurrentBehavior(leader);
1835
1836         leader.handleMessage(followerActor, appendEntriesReply);
1837
1838         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1839                 AppendEntries.class, 2);
1840         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1841
1842         appendEntries = appendEntriesList.get(0);
1843         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1844         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1845         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1846
1847         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1848         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1849                 appendEntries.getEntries().get(0).getData());
1850         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1851         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1852                 appendEntries.getEntries().get(1).getData());
1853
1854         appendEntries = appendEntriesList.get(1);
1855         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1856         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1857         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1858
1859         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1860         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1861                 appendEntries.getEntries().get(0).getData());
1862         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1863         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1864                 appendEntries.getEntries().get(1).getData());
1865
1866         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1867         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1868
1869         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1870
1871         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1872         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1873     }
1874
1875     @Test
1876     public void testHandleRequestVoteReply() {
1877         logStart("testHandleRequestVoteReply");
1878
1879         MockRaftActorContext leaderActorContext = createActorContext();
1880
1881         leader = new Leader(leaderActorContext);
1882
1883         // Should be a no-op.
1884         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1885                 new RequestVoteReply(1, true));
1886
1887         assertEquals(RaftState.Leader, raftActorBehavior.state());
1888
1889         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1890
1891         assertEquals(RaftState.Leader, raftActorBehavior.state());
1892     }
1893
1894     @Test
1895     public void testIsolatedLeaderCheckNoFollowers() {
1896         logStart("testIsolatedLeaderCheckNoFollowers");
1897
1898         MockRaftActorContext leaderActorContext = createActorContext();
1899
1900         leader = new Leader(leaderActorContext);
1901         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1902         assertTrue(newBehavior instanceof Leader);
1903     }
1904
1905     @Test
1906     public void testIsolatedLeaderCheckNoVotingFollowers() {
1907         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1908
1909         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1910         Follower follower = new Follower(followerActorContext);
1911         followerActor.underlyingActor().setBehavior(follower);
1912
1913         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1914         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1915                 new FiniteDuration(1000, TimeUnit.SECONDS));
1916         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1917
1918         leader = new Leader(leaderActorContext);
1919         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1920         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1921         assertTrue("Expected Leader", newBehavior instanceof Leader);
1922     }
1923
1924     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1925         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1926         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1927
1928         MockRaftActorContext leaderActorContext = createActorContext();
1929
1930         Map<String, String> peerAddresses = new HashMap<>();
1931         peerAddresses.put("follower-1", followerActor1.path().toString());
1932         peerAddresses.put("follower-2", followerActor2.path().toString());
1933
1934         leaderActorContext.setPeerAddresses(peerAddresses);
1935         leaderActorContext.setRaftPolicy(raftPolicy);
1936
1937         leader = new Leader(leaderActorContext);
1938
1939         leader.markFollowerActive("follower-1");
1940         leader.markFollowerActive("follower-2");
1941         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1942         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1943
1944         // kill 1 follower and verify if that got killed
1945         final TestKit probe = new TestKit(getSystem());
1946         probe.watch(followerActor1);
1947         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1948         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1949         assertEquals(termMsg1.getActor(), followerActor1);
1950
1951         leader.markFollowerInActive("follower-1");
1952         leader.markFollowerActive("follower-2");
1953         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1954         assertTrue("Behavior not instance of Leader when majority of followers are active",
1955                 newBehavior instanceof Leader);
1956
1957         // kill 2nd follower and leader should change to Isolated leader
1958         followerActor2.tell(PoisonPill.getInstance(), null);
1959         probe.watch(followerActor2);
1960         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1961         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1962         assertEquals(termMsg2.getActor(), followerActor2);
1963
1964         leader.markFollowerInActive("follower-2");
1965         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1966     }
1967
1968     @Test
1969     public void testIsolatedLeaderCheckTwoFollowers() {
1970         logStart("testIsolatedLeaderCheckTwoFollowers");
1971
1972         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1973
1974         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1975             newBehavior instanceof IsolatedLeader);
1976     }
1977
1978     @Test
1979     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1980         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1981
1982         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1983
1984         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1985                 newBehavior instanceof Leader);
1986     }
1987
1988     @Test
1989     public void testLaggingFollowerStarvation() {
1990         logStart("testLaggingFollowerStarvation");
1991
1992         String leaderActorId = actorFactory.generateActorId("leader");
1993         String follower1ActorId = actorFactory.generateActorId("follower");
1994         String follower2ActorId = actorFactory.generateActorId("follower");
1995
1996         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1997         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1998
1999         MockRaftActorContext leaderActorContext =
2000                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
2001
2002         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
2003         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
2004         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
2005
2006         leaderActorContext.setConfigParams(configParams);
2007
2008         leaderActorContext.setReplicatedLog(
2009                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
2010
2011         Map<String, String> peerAddresses = new HashMap<>();
2012         peerAddresses.put(follower1ActorId,
2013                 follower1Actor.path().toString());
2014         peerAddresses.put(follower2ActorId,
2015                 follower2Actor.path().toString());
2016
2017         leaderActorContext.setPeerAddresses(peerAddresses);
2018         leaderActorContext.getTermInformation().update(1, leaderActorId);
2019
2020         leader = createBehavior(leaderActorContext);
2021
2022         leaderActor.underlyingActor().setBehavior(leader);
2023
2024         for (int i = 1; i < 6; i++) {
2025             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2026             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2027                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2028             assertTrue(newBehavior == leader);
2029             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2030         }
2031
2032         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2033         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2034
2035         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2036                 heartbeats.size() > 1);
2037
2038         // Check if follower-2 got AppendEntries during this time and was not starved
2039         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2040
2041         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2042                 appendEntries.size() > 1);
2043     }
2044
2045     @Test
2046     public void testReplicationConsensusWithNonVotingFollower() {
2047         logStart("testReplicationConsensusWithNonVotingFollower");
2048
2049         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2050         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2051                 new FiniteDuration(1000, TimeUnit.SECONDS));
2052
2053         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2054         leaderActorContext.setCommitIndex(-1);
2055         leaderActorContext.setLastApplied(-1);
2056
2057         String nonVotingFollowerId = "nonvoting-follower";
2058         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2059                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2060
2061         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2062                 VotingState.NON_VOTING);
2063
2064         leader = new Leader(leaderActorContext);
2065         leaderActorContext.setCurrentBehavior(leader);
2066
2067         // Ignore initial heartbeats
2068         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2069         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2070
2071         MessageCollectorActor.clearMessages(followerActor);
2072         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2073         MessageCollectorActor.clearMessages(leaderActor);
2074
2075         // Send a Replicate message and wait for AppendEntries.
2076         sendReplicate(leaderActorContext, 0);
2077
2078         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2079         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2080
2081         // Send reply only from the voting follower and verify consensus via ApplyState.
2082         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2083
2084         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2085
2086         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2087
2088         MessageCollectorActor.clearMessages(followerActor);
2089         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2090         MessageCollectorActor.clearMessages(leaderActor);
2091
2092         // Send another Replicate message
2093         sendReplicate(leaderActorContext, 1);
2094
2095         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2096         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2097                 AppendEntries.class);
2098         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2099         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2100
2101         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2102         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2103
2104         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2105
2106         // Send reply from the voting follower and verify consensus.
2107         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2108
2109         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2110     }
2111
2112     @Test
2113     public void testTransferLeadershipWithFollowerInSync() {
2114         logStart("testTransferLeadershipWithFollowerInSync");
2115
2116         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2117         leaderActorContext.setLastApplied(-1);
2118         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2119                 new FiniteDuration(1000, TimeUnit.SECONDS));
2120         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2121
2122         leader = new Leader(leaderActorContext);
2123         leaderActorContext.setCurrentBehavior(leader);
2124
2125         // Initial heartbeat
2126         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2127         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2128         MessageCollectorActor.clearMessages(followerActor);
2129
2130         sendReplicate(leaderActorContext, 0);
2131         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2132
2133         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2134         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2135         MessageCollectorActor.clearMessages(followerActor);
2136
2137         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2138         leader.transferLeadership(mockTransferCohort);
2139
2140         verify(mockTransferCohort, never()).transferComplete();
2141         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2142         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2143         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2144
2145         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2146         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2147
2148         // Leader should force an election timeout
2149         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2150
2151         verify(mockTransferCohort).transferComplete();
2152     }
2153
2154     @Test
2155     public void testTransferLeadershipWithEmptyLog() {
2156         logStart("testTransferLeadershipWithEmptyLog");
2157
2158         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2159         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2160                 new FiniteDuration(1000, TimeUnit.SECONDS));
2161         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2162
2163         leader = new Leader(leaderActorContext);
2164         leaderActorContext.setCurrentBehavior(leader);
2165
2166         // Initial heartbeat
2167         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2168         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2169         MessageCollectorActor.clearMessages(followerActor);
2170
2171         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2172         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2173         leader.transferLeadership(mockTransferCohort);
2174
2175         verify(mockTransferCohort, never()).transferComplete();
2176         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2177         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2178
2179         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2180         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2181
2182         // Leader should force an election timeout
2183         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2184
2185         verify(mockTransferCohort).transferComplete();
2186     }
2187
2188     @Test
2189     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2190         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2191
2192         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2193         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2194                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2195
2196         leader = new Leader(leaderActorContext);
2197         leaderActorContext.setCurrentBehavior(leader);
2198
2199         // Initial heartbeat
2200         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2201         MessageCollectorActor.clearMessages(followerActor);
2202
2203         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2204         doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
2205         leader.transferLeadership(mockTransferCohort);
2206
2207         verify(mockTransferCohort, never()).transferComplete();
2208
2209         // Sync up the follower.
2210         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2211         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2212         MessageCollectorActor.clearMessages(followerActor);
2213
2214         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2215                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2216         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2217         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2218         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2219
2220         // Leader should force an election timeout
2221         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2222
2223         verify(mockTransferCohort).transferComplete();
2224     }
2225
2226     @Test
2227     public void testTransferLeadershipWithFollowerSyncTimeout() {
2228         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2229
2230         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2231         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2232                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2233         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2234         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2235
2236         leader = new Leader(leaderActorContext);
2237         leaderActorContext.setCurrentBehavior(leader);
2238
2239         // Initial heartbeat
2240         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2241         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2242         MessageCollectorActor.clearMessages(followerActor);
2243
2244         sendReplicate(leaderActorContext, 0);
2245         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2246
2247         MessageCollectorActor.clearMessages(followerActor);
2248
2249         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2250         leader.transferLeadership(mockTransferCohort);
2251
2252         verify(mockTransferCohort, never()).transferComplete();
2253
2254         // Send heartbeats to time out the transfer.
2255         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2256             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2257                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2258             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2259         }
2260
2261         verify(mockTransferCohort).abortTransfer();
2262         verify(mockTransferCohort, never()).transferComplete();
2263         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2264     }
2265
2266     @Test
2267     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2268         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2269
2270         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2271                 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2272                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2273         final MockRaftActorContext.MockPayload largePayload =
2274                 new MockRaftActorContext.MockPayload("large", serializedSize);
2275
2276         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2277         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2278                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2279         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2280         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2281         leaderActorContext.setCommitIndex(-1);
2282         leaderActorContext.setLastApplied(-1);
2283
2284         leader = new Leader(leaderActorContext);
2285         leaderActorContext.setCurrentBehavior(leader);
2286
2287         // Send initial heartbeat reply so follower is marked active
2288         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2289         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2290         MessageCollectorActor.clearMessages(followerActor);
2291
2292         // Send normal payload first to prime commit index.
2293         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2294         sendReplicate(leaderActorContext, term, 0);
2295
2296         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2297         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2298         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2299
2300         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2301         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2302         MessageCollectorActor.clearMessages(followerActor);
2303
2304         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2305         sendReplicate(leaderActorContext, term, 1, largePayload);
2306
2307         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2308         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2309         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2310
2311         final Identifier slicingId = messageSlice.getIdentifier();
2312
2313         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2314         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2315         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2316         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2317         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2318         MessageCollectorActor.clearMessages(followerActor);
2319
2320         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2321
2322         // Sleep for the heartbeat interval so AppendEntries is sent.
2323         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2324                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2325
2326         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2327
2328         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2329         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2330         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2331         MessageCollectorActor.clearMessages(followerActor);
2332
2333         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2334
2335         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2336         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2337         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2338
2339         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2340
2341         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2342
2343         MessageCollectorActor.clearMessages(followerActor);
2344
2345         // Send another normal payload.
2346
2347         sendReplicate(leaderActorContext, term, 2);
2348
2349         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2350         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2351         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2352         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2353     }
2354
2355     @Test
2356     public void testLargePayloadSlicingExpiration() {
2357         logStart("testLargePayloadSlicingExpiration");
2358
2359         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2360         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2361                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2362         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2363         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2364         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2365         leaderActorContext.setCommitIndex(-1);
2366         leaderActorContext.setLastApplied(-1);
2367
2368         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2369         leader = new Leader(leaderActorContext);
2370         leaderActorContext.setCurrentBehavior(leader);
2371
2372         // Send initial heartbeat reply so follower is marked active
2373         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2374         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2375         MessageCollectorActor.clearMessages(followerActor);
2376
2377         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2378                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2379         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2380
2381         // Sleep for at least 3 * election timeout so the slicing state expires.
2382         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2383                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2384         MessageCollectorActor.clearMessages(followerActor);
2385
2386         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2387
2388         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2389         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2390         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2391
2392         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2393         MessageCollectorActor.clearMessages(followerActor);
2394
2395         // Send an AppendEntriesReply - this should restart the slicing.
2396
2397         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2398                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2399
2400         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2401
2402         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2403     }
2404
2405     @Test
2406     public void testLeaderAddressInAppendEntries() {
2407         logStart("testLeaderAddressInAppendEntries");
2408
2409         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2410         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2411                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2412         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2413         leaderActorContext.setCommitIndex(-1);
2414         leaderActorContext.setLastApplied(-1);
2415
2416         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2417             peerId -> leaderActor.path().toString());
2418
2419         leader = new Leader(leaderActorContext);
2420         leaderActorContext.setCurrentBehavior(leader);
2421
2422         // Initial heartbeat shouldn't have the leader address
2423
2424         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2425         assertFalse(appendEntries.getLeaderAddress().isPresent());
2426         MessageCollectorActor.clearMessages(followerActor);
2427
2428         // Send AppendEntriesReply indicating the follower needs the leader address
2429
2430         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2431                 RaftVersions.CURRENT_VERSION));
2432
2433         // Sleep for the heartbeat interval so AppendEntries is sent.
2434         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2435                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2436
2437         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2438
2439         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2440         assertTrue(appendEntries.getLeaderAddress().isPresent());
2441         assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2442         MessageCollectorActor.clearMessages(followerActor);
2443
2444         // Send AppendEntriesReply indicating the follower does not need the leader address
2445
2446         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2447                 RaftVersions.CURRENT_VERSION));
2448
2449         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2450                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2451
2452         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2453
2454         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2455         assertFalse(appendEntries.getLeaderAddress().isPresent());
2456     }
2457
2458     @Override
2459     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2460             final ActorRef actorRef, final RaftRPC rpc) {
2461         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2462         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2463     }
2464
2465     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2466
2467         private final long electionTimeOutIntervalMillis;
2468         private final int snapshotChunkSize;
2469
2470         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2471             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2472             this.snapshotChunkSize = snapshotChunkSize;
2473         }
2474
2475         @Override
2476         public FiniteDuration getElectionTimeOutInterval() {
2477             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2478         }
2479
2480         @Override
2481         public int getSnapshotChunkSize() {
2482             return snapshotChunkSize;
2483         }
2484     }
2485 }