Bug 7407 - Add request leadership functionality to shards
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.JavaTestKit;
27 import akka.testkit.TestActorRef;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
43 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
44 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
45 import org.opendaylight.controller.cluster.raft.RaftActorContext;
46 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
47 import org.opendaylight.controller.cluster.raft.RaftState;
48 import org.opendaylight.controller.cluster.raft.RaftVersions;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.VotingState;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
54 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
55 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
56 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
59 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
60 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
61 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
63 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
65 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
67 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
69 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
70 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
71 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
72 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
73 import org.opendaylight.yangtools.concepts.Identifier;
74 import scala.concurrent.duration.FiniteDuration;
75
76 public class LeaderTest extends AbstractLeaderTest<Leader> {
77
78     static final String FOLLOWER_ID = "follower";
79     public static final String LEADER_ID = "leader";
80
81     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
82             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
83
84     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
85             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
86
87     private Leader leader;
88     private final short payloadVersion = 5;
89
90     @Override
91     @After
92     public void tearDown() throws Exception {
93         if (leader != null) {
94             leader.close();
95         }
96
97         super.tearDown();
98     }
99
100     @Test
101     public void testHandleMessageForUnknownMessage() throws Exception {
102         logStart("testHandleMessageForUnknownMessage");
103
104         leader = new Leader(createActorContext());
105
106         // handle message should null when it receives an unknown message
107         assertNull(leader.handleMessage(followerActor, "foo"));
108     }
109
110     @Test
111     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
112         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
113
114         MockRaftActorContext actorContext = createActorContextWithFollower();
115         actorContext.setCommitIndex(-1);
116         actorContext.setPayloadVersion(payloadVersion);
117
118         long term = 1;
119         actorContext.getTermInformation().update(term, "");
120
121         leader = new Leader(actorContext);
122         actorContext.setCurrentBehavior(leader);
123
124         // Leader should send an immediate heartbeat with no entries as follower is inactive.
125         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
126         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
127         assertEquals("getTerm", term, appendEntries.getTerm());
128         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
129         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
130         assertEquals("Entries size", 0, appendEntries.getEntries().size());
131         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
132
133         // The follower would normally reply - simulate that explicitly here.
134         leader.handleMessage(followerActor, new AppendEntriesReply(
135                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
136         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
137
138         followerActor.underlyingActor().clear();
139
140         // Sleep for the heartbeat interval so AppendEntries is sent.
141         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
142                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
143
144         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
145
146         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
147         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
148         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
149         assertEquals("Entries size", 1, appendEntries.getEntries().size());
150         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
151         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
152         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
153     }
154
155
156     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
157         return sendReplicate(actorContext, 1, index);
158     }
159
160     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
161         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
162         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
163         actorContext.getReplicatedLog().append(newEntry);
164         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
165     }
166
167     @Test
168     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
169         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
170
171         MockRaftActorContext actorContext = createActorContextWithFollower();
172
173         long term = 1;
174         actorContext.getTermInformation().update(term, "");
175
176         leader = new Leader(actorContext);
177
178         // Leader will send an immediate heartbeat - ignore it.
179         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
180
181         // The follower would normally reply - simulate that explicitly here.
182         long lastIndex = actorContext.getReplicatedLog().lastIndex();
183         leader.handleMessage(followerActor, new AppendEntriesReply(
184                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
185         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
186
187         followerActor.underlyingActor().clear();
188
189         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
190
191         // State should not change
192         assertTrue(raftBehavior instanceof Leader);
193
194         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
195         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
196         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
197         assertEquals("Entries size", 1, appendEntries.getEntries().size());
198         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
199         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
200         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
201         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
202     }
203
204     @Test
205     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
206         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
207
208         MockRaftActorContext actorContext = createActorContextWithFollower();
209         actorContext.setCommitIndex(-1);
210         actorContext.setLastApplied(-1);
211
212         // The raft context is initialized with a couple log entries. However the commitIndex
213         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
214         // committed and applied. Now it regains leadership with a higher term (2).
215         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
216         long newTerm = prevTerm + 1;
217         actorContext.getTermInformation().update(newTerm, "");
218
219         leader = new Leader(actorContext);
220         actorContext.setCurrentBehavior(leader);
221
222         // Leader will send an immediate heartbeat - ignore it.
223         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
224
225         // The follower replies with the leader's current last index and term, simulating that it is
226         // up to date with the leader.
227         long lastIndex = actorContext.getReplicatedLog().lastIndex();
228         leader.handleMessage(followerActor, new AppendEntriesReply(
229                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
230
231         // The commit index should not get updated even though consensus was reached. This is b/c the
232         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
233         // from previous terms by counting replicas".
234         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
235
236         followerActor.underlyingActor().clear();
237
238         // Now replicate a new entry with the new term 2.
239         long newIndex = lastIndex + 1;
240         sendReplicate(actorContext, newTerm, newIndex);
241
242         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
243         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
244         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
245         assertEquals("Entries size", 1, appendEntries.getEntries().size());
246         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
247         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
248         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
249
250         // The follower replies with success. The leader should now update the commit index to the new index
251         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
252         // prior entries are committed indirectly".
253         leader.handleMessage(followerActor, new AppendEntriesReply(
254                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
255
256         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
257     }
258
259     @Test
260     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
261         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
262
263         MockRaftActorContext actorContext = createActorContextWithFollower();
264         actorContext.setRaftPolicy(createRaftPolicy(true, true));
265
266         long term = 1;
267         actorContext.getTermInformation().update(term, "");
268
269         leader = new Leader(actorContext);
270
271         // Leader will send an immediate heartbeat - ignore it.
272         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
273
274         // The follower would normally reply - simulate that explicitly here.
275         long lastIndex = actorContext.getReplicatedLog().lastIndex();
276         leader.handleMessage(followerActor, new AppendEntriesReply(
277                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
278         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
279
280         followerActor.underlyingActor().clear();
281
282         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
283
284         // State should not change
285         assertTrue(raftBehavior instanceof Leader);
286
287         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
288         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
289         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
290         assertEquals("Entries size", 1, appendEntries.getEntries().size());
291         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
292         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
293         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
294         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
295     }
296
297     @Test
298     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
299         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
300
301         MockRaftActorContext actorContext = createActorContextWithFollower();
302         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
303             @Override
304             public FiniteDuration getHeartBeatInterval() {
305                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
306             }
307         });
308
309         long term = 1;
310         actorContext.getTermInformation().update(term, "");
311
312         leader = new Leader(actorContext);
313
314         // Leader will send an immediate heartbeat - ignore it.
315         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
316
317         // The follower would normally reply - simulate that explicitly here.
318         long lastIndex = actorContext.getReplicatedLog().lastIndex();
319         leader.handleMessage(followerActor, new AppendEntriesReply(
320                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
321         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
322
323         followerActor.underlyingActor().clear();
324
325         for (int i = 0; i < 5; i++) {
326             sendReplicate(actorContext, lastIndex + i + 1);
327         }
328
329         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
330         // We expect only 1 message to be sent because of two reasons,
331         // - an append entries reply was not received
332         // - the heartbeat interval has not expired
333         // In this scenario if multiple messages are sent they would likely be duplicates
334         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
335     }
336
337     @Test
338     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
339         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
340
341         MockRaftActorContext actorContext = createActorContextWithFollower();
342         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
343             @Override
344             public FiniteDuration getHeartBeatInterval() {
345                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
346             }
347         });
348
349         long term = 1;
350         actorContext.getTermInformation().update(term, "");
351
352         leader = new Leader(actorContext);
353
354         // Leader will send an immediate heartbeat - ignore it.
355         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
356
357         // The follower would normally reply - simulate that explicitly here.
358         long lastIndex = actorContext.getReplicatedLog().lastIndex();
359         leader.handleMessage(followerActor, new AppendEntriesReply(
360                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
361         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
362
363         followerActor.underlyingActor().clear();
364
365         for (int i = 0; i < 3; i++) {
366             sendReplicate(actorContext, lastIndex + i + 1);
367             leader.handleMessage(followerActor, new AppendEntriesReply(
368                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
369
370         }
371
372         for (int i = 3; i < 5; i++) {
373             sendReplicate(actorContext, lastIndex + i + 1);
374         }
375
376         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
378         // get sent to the follower - but not the 5th
379         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
380
381         for (int i = 0; i < 4; i++) {
382             long expected = allMessages.get(i).getEntries().get(0).getIndex();
383             assertEquals(expected, i + 2);
384         }
385     }
386
387     @Test
388     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
389         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
390
391         MockRaftActorContext actorContext = createActorContextWithFollower();
392         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
393             @Override
394             public FiniteDuration getHeartBeatInterval() {
395                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
396             }
397         });
398
399         long term = 1;
400         actorContext.getTermInformation().update(term, "");
401
402         leader = new Leader(actorContext);
403
404         // Leader will send an immediate heartbeat - ignore it.
405         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
406
407         // The follower would normally reply - simulate that explicitly here.
408         long lastIndex = actorContext.getReplicatedLog().lastIndex();
409         leader.handleMessage(followerActor, new AppendEntriesReply(
410                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
411         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
412
413         followerActor.underlyingActor().clear();
414
415         sendReplicate(actorContext, lastIndex + 1);
416
417         // Wait slightly longer than heartbeat duration
418         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
419
420         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
421
422         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
423         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
424
425         assertEquals(1, allMessages.get(0).getEntries().size());
426         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
427         assertEquals(1, allMessages.get(1).getEntries().size());
428         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
429
430     }
431
432     @Test
433     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
434         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
435
436         MockRaftActorContext actorContext = createActorContextWithFollower();
437         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
438             @Override
439             public FiniteDuration getHeartBeatInterval() {
440                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
441             }
442         });
443
444         long term = 1;
445         actorContext.getTermInformation().update(term, "");
446
447         leader = new Leader(actorContext);
448
449         // Leader will send an immediate heartbeat - ignore it.
450         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
451
452         // The follower would normally reply - simulate that explicitly here.
453         long lastIndex = actorContext.getReplicatedLog().lastIndex();
454         leader.handleMessage(followerActor, new AppendEntriesReply(
455                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
456         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
457
458         followerActor.underlyingActor().clear();
459
460         for (int i = 0; i < 3; i++) {
461             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
462             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
463         }
464
465         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
466         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
467     }
468
469     @Test
470     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
471         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
472
473         MockRaftActorContext actorContext = createActorContextWithFollower();
474         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
475             @Override
476             public FiniteDuration getHeartBeatInterval() {
477                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
478             }
479         });
480
481         long term = 1;
482         actorContext.getTermInformation().update(term, "");
483
484         leader = new Leader(actorContext);
485
486         // Leader will send an immediate heartbeat - ignore it.
487         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
488
489         // The follower would normally reply - simulate that explicitly here.
490         long lastIndex = actorContext.getReplicatedLog().lastIndex();
491         leader.handleMessage(followerActor, new AppendEntriesReply(
492                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
493         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
494
495         followerActor.underlyingActor().clear();
496
497         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
498         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
499         sendReplicate(actorContext, lastIndex + 1);
500
501         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
502         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
503
504         assertEquals(0, allMessages.get(0).getEntries().size());
505         assertEquals(1, allMessages.get(1).getEntries().size());
506     }
507
508
509     @Test
510     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
511         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
512
513         MockRaftActorContext actorContext = createActorContext();
514
515         leader = new Leader(actorContext);
516
517         actorContext.setLastApplied(0);
518
519         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
520         long term = actorContext.getTermInformation().getCurrentTerm();
521         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
522                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
523
524         actorContext.getReplicatedLog().append(newEntry);
525
526         final Identifier id = new MockIdentifier("state-id");
527         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
528                 new Replicate(leaderActor, id, newEntry, true));
529
530         // State should not change
531         assertTrue(raftBehavior instanceof Leader);
532
533         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
534
535         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
536         // one since lastApplied state is 0.
537         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
538                 leaderActor, ApplyState.class);
539         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
540
541         for (int i = 0; i <= newLogIndex - 1; i++) {
542             ApplyState applyState = applyStateList.get(i);
543             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
544             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
545         }
546
547         ApplyState last = applyStateList.get((int) newLogIndex - 1);
548         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
549         assertEquals("getIdentifier", id, last.getIdentifier());
550     }
551
552     @Test
553     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
554         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
555
556         final MockRaftActorContext actorContext = createActorContextWithFollower();
557
558         Map<String, String> leadersSnapshot = new HashMap<>();
559         leadersSnapshot.put("1", "A");
560         leadersSnapshot.put("2", "B");
561         leadersSnapshot.put("3", "C");
562
563         //clears leaders log
564         actorContext.getReplicatedLog().removeFrom(0);
565
566         final int commitIndex = 3;
567         final int snapshotIndex = 2;
568         final int snapshotTerm = 1;
569
570         // set the snapshot variables in replicatedlog
571         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
572         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
573         actorContext.setCommitIndex(commitIndex);
574         //set follower timeout to 2 mins, helps during debugging
575         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
576
577         leader = new Leader(actorContext);
578
579         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
580         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
581
582         //update follower timestamp
583         leader.markFollowerActive(FOLLOWER_ID);
584
585         ByteString bs = toByteString(leadersSnapshot);
586         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
587                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
588                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
589         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
590                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
591         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
592         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
593
594         //send first chunk and no InstallSnapshotReply received yet
595         fts.getNextChunk();
596         fts.incrementChunkIndex();
597
598         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
599                 TimeUnit.MILLISECONDS);
600
601         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
602
603         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
604
605         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
606
607         //InstallSnapshotReply received
608         fts.markSendStatus(true);
609
610         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
611
612         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
613
614         assertEquals(commitIndex, is.getLastIncludedIndex());
615     }
616
617     @Test
618     public void testSendAppendEntriesSnapshotScenario() throws Exception {
619         logStart("testSendAppendEntriesSnapshotScenario");
620
621         final MockRaftActorContext actorContext = createActorContextWithFollower();
622
623         Map<String, String> leadersSnapshot = new HashMap<>();
624         leadersSnapshot.put("1", "A");
625         leadersSnapshot.put("2", "B");
626         leadersSnapshot.put("3", "C");
627
628         //clears leaders log
629         actorContext.getReplicatedLog().removeFrom(0);
630
631         final int followersLastIndex = 2;
632         final int snapshotIndex = 3;
633         final int newEntryIndex = 4;
634         final int snapshotTerm = 1;
635         final int currentTerm = 2;
636
637         // set the snapshot variables in replicatedlog
638         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
639         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
640         actorContext.setCommitIndex(followersLastIndex);
641
642         leader = new Leader(actorContext);
643
644         // Leader will send an immediate heartbeat - ignore it.
645         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
646
647         // new entry
648         SimpleReplicatedLogEntry entry =
649                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
650                         new MockRaftActorContext.MockPayload("D"));
651
652         actorContext.getReplicatedLog().append(entry);
653
654         //update follower timestamp
655         leader.markFollowerActive(FOLLOWER_ID);
656
657         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
658         RaftActorBehavior raftBehavior = leader.handleMessage(
659                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
660
661         assertTrue(raftBehavior instanceof Leader);
662
663         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
664     }
665
666     @Test
667     public void testInitiateInstallSnapshot() throws Exception {
668         logStart("testInitiateInstallSnapshot");
669
670         MockRaftActorContext actorContext = createActorContextWithFollower();
671
672         //clears leaders log
673         actorContext.getReplicatedLog().removeFrom(0);
674
675         final int followersLastIndex = 2;
676         final int snapshotIndex = 3;
677         final int newEntryIndex = 4;
678         final int snapshotTerm = 1;
679         final int currentTerm = 2;
680
681         // set the snapshot variables in replicatedlog
682         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
683         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
684         actorContext.setLastApplied(3);
685         actorContext.setCommitIndex(followersLastIndex);
686
687         leader = new Leader(actorContext);
688
689         // Leader will send an immediate heartbeat - ignore it.
690         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
691
692         // set the snapshot as absent and check if capture-snapshot is invoked.
693         leader.setSnapshot(null);
694
695         // new entry
696         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
697                 new MockRaftActorContext.MockPayload("D"));
698
699         actorContext.getReplicatedLog().append(entry);
700
701         //update follower timestamp
702         leader.markFollowerActive(FOLLOWER_ID);
703
704         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
705
706         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
707
708         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
709
710         assertEquals(3, cs.getLastAppliedIndex());
711         assertEquals(1, cs.getLastAppliedTerm());
712         assertEquals(4, cs.getLastIndex());
713         assertEquals(2, cs.getLastTerm());
714
715         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
716         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
717
718         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
719     }
720
721     @Test
722     public void testInitiateForceInstallSnapshot() throws Exception {
723         logStart("testInitiateForceInstallSnapshot");
724
725         MockRaftActorContext actorContext = createActorContextWithFollower();
726
727         final int followersLastIndex = 2;
728         final int snapshotIndex = -1;
729         final int newEntryIndex = 4;
730         final int snapshotTerm = -1;
731         final int currentTerm = 2;
732
733         // set the snapshot variables in replicatedlog
734         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
735         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
736         actorContext.setLastApplied(3);
737         actorContext.setCommitIndex(followersLastIndex);
738
739         actorContext.getReplicatedLog().removeFrom(0);
740
741         leader = new Leader(actorContext);
742         actorContext.setCurrentBehavior(leader);
743
744         // Leader will send an immediate heartbeat - ignore it.
745         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
746
747         // set the snapshot as absent and check if capture-snapshot is invoked.
748         leader.setSnapshot(null);
749
750         for (int i = 0; i < 4; i++) {
751             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
752                     new MockRaftActorContext.MockPayload("X" + i)));
753         }
754
755         // new entry
756         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
757                 new MockRaftActorContext.MockPayload("D"));
758
759         actorContext.getReplicatedLog().append(entry);
760
761         //update follower timestamp
762         leader.markFollowerActive(FOLLOWER_ID);
763
764         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
765         // installed with a SendInstallSnapshot
766         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
767
768         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
769
770         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
771
772         assertEquals(3, cs.getLastAppliedIndex());
773         assertEquals(1, cs.getLastAppliedTerm());
774         assertEquals(4, cs.getLastIndex());
775         assertEquals(2, cs.getLastTerm());
776
777         // if an initiate is started again when first is in progress, it should not initiate Capture
778         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
779
780         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
781     }
782
783
784     @Test
785     public void testInstallSnapshot() throws Exception {
786         logStart("testInstallSnapshot");
787
788         final MockRaftActorContext actorContext = createActorContextWithFollower();
789
790         Map<String, String> leadersSnapshot = new HashMap<>();
791         leadersSnapshot.put("1", "A");
792         leadersSnapshot.put("2", "B");
793         leadersSnapshot.put("3", "C");
794
795         //clears leaders log
796         actorContext.getReplicatedLog().removeFrom(0);
797
798         final int lastAppliedIndex = 3;
799         final int snapshotIndex = 2;
800         final int snapshotTerm = 1;
801         final int currentTerm = 2;
802
803         // set the snapshot variables in replicatedlog
804         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
805         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
806         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
807         actorContext.setCommitIndex(lastAppliedIndex);
808         actorContext.setLastApplied(lastAppliedIndex);
809
810         leader = new Leader(actorContext);
811
812         // Initial heartbeat.
813         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
814
815         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
816         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
817
818         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
819         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
820                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
821
822         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
823                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
824
825         assertTrue(raftBehavior instanceof Leader);
826
827         // check if installsnapshot gets called with the correct values.
828
829         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
830                 InstallSnapshot.class);
831
832         assertNotNull(installSnapshot.getData());
833         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
834         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
835
836         assertEquals(currentTerm, installSnapshot.getTerm());
837     }
838
839     @Test
840     public void testForceInstallSnapshot() throws Exception {
841         logStart("testForceInstallSnapshot");
842
843         final MockRaftActorContext actorContext = createActorContextWithFollower();
844
845         Map<String, String> leadersSnapshot = new HashMap<>();
846         leadersSnapshot.put("1", "A");
847         leadersSnapshot.put("2", "B");
848         leadersSnapshot.put("3", "C");
849
850         final int lastAppliedIndex = 3;
851         final int snapshotIndex = -1;
852         final int snapshotTerm = -1;
853         final int currentTerm = 2;
854
855         // set the snapshot variables in replicatedlog
856         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
857         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
858         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
859         actorContext.setCommitIndex(lastAppliedIndex);
860         actorContext.setLastApplied(lastAppliedIndex);
861
862         leader = new Leader(actorContext);
863
864         // Initial heartbeat.
865         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
866
867         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
868         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
869
870         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
871         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
872                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
873
874         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
875                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
876
877         assertTrue(raftBehavior instanceof Leader);
878
879         // check if installsnapshot gets called with the correct values.
880
881         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
882                 InstallSnapshot.class);
883
884         assertNotNull(installSnapshot.getData());
885         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
886         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
887
888         assertEquals(currentTerm, installSnapshot.getTerm());
889     }
890
891     @Test
892     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
893         logStart("testHandleInstallSnapshotReplyLastChunk");
894
895         MockRaftActorContext actorContext = createActorContextWithFollower();
896
897         final int commitIndex = 3;
898         final int snapshotIndex = 2;
899         final int snapshotTerm = 1;
900         final int currentTerm = 2;
901
902         actorContext.setCommitIndex(commitIndex);
903
904         leader = new Leader(actorContext);
905         actorContext.setCurrentBehavior(leader);
906
907         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
908         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
909
910         // Ignore initial heartbeat.
911         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
912
913         Map<String, String> leadersSnapshot = new HashMap<>();
914         leadersSnapshot.put("1", "A");
915         leadersSnapshot.put("2", "B");
916         leadersSnapshot.put("3", "C");
917
918         // set the snapshot variables in replicatedlog
919
920         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
921         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
922         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
923
924         ByteString bs = toByteString(leadersSnapshot);
925         leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
926                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
927                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
928         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
929                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
930         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
931         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
932         while (!fts.isLastChunk(fts.getChunkIndex())) {
933             fts.getNextChunk();
934             fts.incrementChunkIndex();
935         }
936
937         //clears leaders log
938         actorContext.getReplicatedLog().removeFrom(0);
939
940         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
941                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
942
943         assertTrue(raftBehavior instanceof Leader);
944
945         assertEquals(1, leader.followerLogSize());
946         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
947         assertNotNull(fli);
948         assertNull(fli.getInstallSnapshotState());
949         assertEquals(commitIndex, fli.getMatchIndex());
950         assertEquals(commitIndex + 1, fli.getNextIndex());
951         assertFalse(leader.hasSnapshot());
952     }
953
954     @Test
955     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
956         logStart("testSendSnapshotfromInstallSnapshotReply");
957
958         MockRaftActorContext actorContext = createActorContextWithFollower();
959
960         final int commitIndex = 3;
961         final int snapshotIndex = 2;
962         final int snapshotTerm = 1;
963         final int currentTerm = 2;
964
965         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
966             @Override
967             public int getSnapshotChunkSize() {
968                 return 50;
969             }
970         };
971         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
972         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
973
974         actorContext.setConfigParams(configParams);
975         actorContext.setCommitIndex(commitIndex);
976
977         leader = new Leader(actorContext);
978         actorContext.setCurrentBehavior(leader);
979
980         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
981         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
982
983         Map<String, String> leadersSnapshot = new HashMap<>();
984         leadersSnapshot.put("1", "A");
985         leadersSnapshot.put("2", "B");
986         leadersSnapshot.put("3", "C");
987
988         // set the snapshot variables in replicatedlog
989         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
990         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
991         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
992
993         ByteString bs = toByteString(leadersSnapshot);
994         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
995                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
996                 -1, null, null);
997
998         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
999
1000         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1001                 InstallSnapshot.class);
1002
1003         assertEquals(1, installSnapshot.getChunkIndex());
1004         assertEquals(3, installSnapshot.getTotalChunks());
1005
1006         followerActor.underlyingActor().clear();
1007         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1008                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1009
1010         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1011
1012         assertEquals(2, installSnapshot.getChunkIndex());
1013         assertEquals(3, installSnapshot.getTotalChunks());
1014
1015         followerActor.underlyingActor().clear();
1016         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1017                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1018
1019         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1020
1021         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1022         followerActor.underlyingActor().clear();
1023         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1024                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1025
1026         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1027
1028         assertNull(installSnapshot);
1029     }
1030
1031
1032     @Test
1033     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1034         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1035
1036         MockRaftActorContext actorContext = createActorContextWithFollower();
1037
1038         final int commitIndex = 3;
1039         final int snapshotIndex = 2;
1040         final int snapshotTerm = 1;
1041         final int currentTerm = 2;
1042
1043         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1044             @Override
1045             public int getSnapshotChunkSize() {
1046                 return 50;
1047             }
1048         });
1049
1050         actorContext.setCommitIndex(commitIndex);
1051
1052         leader = new Leader(actorContext);
1053
1054         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1055         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1056
1057         Map<String, String> leadersSnapshot = new HashMap<>();
1058         leadersSnapshot.put("1", "A");
1059         leadersSnapshot.put("2", "B");
1060         leadersSnapshot.put("3", "C");
1061
1062         // set the snapshot variables in replicatedlog
1063         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1064         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1065         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1066
1067         ByteString bs = toByteString(leadersSnapshot);
1068         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1069                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1070                 -1, null, null);
1071
1072         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1073         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1074
1075         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1076                 InstallSnapshot.class);
1077
1078         assertEquals(1, installSnapshot.getChunkIndex());
1079         assertEquals(3, installSnapshot.getTotalChunks());
1080
1081         followerActor.underlyingActor().clear();
1082
1083         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1084                 FOLLOWER_ID, -1, false));
1085
1086         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1087                 TimeUnit.MILLISECONDS);
1088
1089         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1090
1091         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1092
1093         assertEquals(1, installSnapshot.getChunkIndex());
1094         assertEquals(3, installSnapshot.getTotalChunks());
1095     }
1096
1097     @Test
1098     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1099         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1100
1101         MockRaftActorContext actorContext = createActorContextWithFollower();
1102
1103         final int commitIndex = 3;
1104         final int snapshotIndex = 2;
1105         final int snapshotTerm = 1;
1106         final int currentTerm = 2;
1107
1108         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1109             @Override
1110             public int getSnapshotChunkSize() {
1111                 return 50;
1112             }
1113         });
1114
1115         actorContext.setCommitIndex(commitIndex);
1116
1117         leader = new Leader(actorContext);
1118
1119         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1120         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1121
1122         Map<String, String> leadersSnapshot = new HashMap<>();
1123         leadersSnapshot.put("1", "A");
1124         leadersSnapshot.put("2", "B");
1125         leadersSnapshot.put("3", "C");
1126
1127         // set the snapshot variables in replicatedlog
1128         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1129         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1130         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1131
1132         ByteString bs = toByteString(leadersSnapshot);
1133         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1134                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1135                 -1, null, null);
1136
1137         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1138
1139         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1140                 InstallSnapshot.class);
1141
1142         assertEquals(1, installSnapshot.getChunkIndex());
1143         assertEquals(3, installSnapshot.getTotalChunks());
1144         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1145                 installSnapshot.getLastChunkHashCode().get().intValue());
1146
1147         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1148
1149         followerActor.underlyingActor().clear();
1150
1151         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1152                 FOLLOWER_ID, 1, true));
1153
1154         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1155
1156         assertEquals(2, installSnapshot.getChunkIndex());
1157         assertEquals(3, installSnapshot.getTotalChunks());
1158         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1159     }
1160
1161     @Test
1162     public void testLeaderInstallSnapshotState() throws IOException {
1163         logStart("testLeaderInstallSnapshotState");
1164
1165         Map<String, String> leadersSnapshot = new HashMap<>();
1166         leadersSnapshot.put("1", "A");
1167         leadersSnapshot.put("2", "B");
1168         leadersSnapshot.put("3", "C");
1169
1170         ByteString bs = toByteString(leadersSnapshot);
1171         byte[] barray = bs.toByteArray();
1172
1173         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1174         fts.setSnapshotBytes(ByteSource.wrap(barray));
1175
1176         assertEquals(bs.size(), barray.length);
1177
1178         int chunkIndex = 0;
1179         for (int i = 0; i < barray.length; i = i + 50) {
1180             int length = i + 50;
1181             chunkIndex++;
1182
1183             if (i + 50 > barray.length) {
1184                 length = barray.length;
1185             }
1186
1187             byte[] chunk = fts.getNextChunk();
1188             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1189             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1190
1191             fts.markSendStatus(true);
1192             if (!fts.isLastChunk(chunkIndex)) {
1193                 fts.incrementChunkIndex();
1194             }
1195         }
1196
1197         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1198         fts.close();
1199     }
1200
1201     @Override
1202     protected Leader createBehavior(final RaftActorContext actorContext) {
1203         return new Leader(actorContext);
1204     }
1205
1206     @Override
1207     protected MockRaftActorContext createActorContext() {
1208         return createActorContext(leaderActor);
1209     }
1210
1211     @Override
1212     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1213         return createActorContext(LEADER_ID, actorRef);
1214     }
1215
1216     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1217         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1218         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1219         configParams.setElectionTimeoutFactor(100000);
1220         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1221         context.setConfigParams(configParams);
1222         context.setPayloadVersion(payloadVersion);
1223         return context;
1224     }
1225
1226     private MockRaftActorContext createActorContextWithFollower() {
1227         MockRaftActorContext actorContext = createActorContext();
1228         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1229                 followerActor.path().toString()).build());
1230         return actorContext;
1231     }
1232
1233     private MockRaftActorContext createFollowerActorContextWithLeader() {
1234         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1235         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1236         followerConfig.setElectionTimeoutFactor(10000);
1237         followerActorContext.setConfigParams(followerConfig);
1238         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1239         return followerActorContext;
1240     }
1241
1242     @Test
1243     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1244         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1245
1246         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1247
1248         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1249
1250         Follower follower = new Follower(followerActorContext);
1251         followerActor.underlyingActor().setBehavior(follower);
1252         followerActorContext.setCurrentBehavior(follower);
1253
1254         Map<String, String> peerAddresses = new HashMap<>();
1255         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1256
1257         leaderActorContext.setPeerAddresses(peerAddresses);
1258
1259         leaderActorContext.getReplicatedLog().removeFrom(0);
1260
1261         //create 3 entries
1262         leaderActorContext.setReplicatedLog(
1263                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1264
1265         leaderActorContext.setCommitIndex(1);
1266
1267         followerActorContext.getReplicatedLog().removeFrom(0);
1268
1269         // follower too has the exact same log entries and has the same commit index
1270         followerActorContext.setReplicatedLog(
1271                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1272
1273         followerActorContext.setCommitIndex(1);
1274
1275         leader = new Leader(leaderActorContext);
1276         leaderActorContext.setCurrentBehavior(leader);
1277
1278         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1279
1280         assertEquals(-1, appendEntries.getLeaderCommit());
1281         assertEquals(0, appendEntries.getEntries().size());
1282         assertEquals(0, appendEntries.getPrevLogIndex());
1283
1284         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1285                 leaderActor, AppendEntriesReply.class);
1286
1287         assertEquals(2, appendEntriesReply.getLogLastIndex());
1288         assertEquals(1, appendEntriesReply.getLogLastTerm());
1289
1290         // follower returns its next index
1291         assertEquals(2, appendEntriesReply.getLogLastIndex());
1292         assertEquals(1, appendEntriesReply.getLogLastTerm());
1293
1294         follower.close();
1295     }
1296
1297     @Test
1298     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1299         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1300
1301         final MockRaftActorContext leaderActorContext = createActorContext();
1302
1303         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1304         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1305
1306         Follower follower = new Follower(followerActorContext);
1307         followerActor.underlyingActor().setBehavior(follower);
1308         followerActorContext.setCurrentBehavior(follower);
1309
1310         Map<String, String> leaderPeerAddresses = new HashMap<>();
1311         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1312
1313         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1314
1315         leaderActorContext.getReplicatedLog().removeFrom(0);
1316
1317         leaderActorContext.setReplicatedLog(
1318                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1319
1320         leaderActorContext.setCommitIndex(1);
1321
1322         followerActorContext.getReplicatedLog().removeFrom(0);
1323
1324         followerActorContext.setReplicatedLog(
1325                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1326
1327         // follower has the same log entries but its commit index > leaders commit index
1328         followerActorContext.setCommitIndex(2);
1329
1330         leader = new Leader(leaderActorContext);
1331
1332         // Initial heartbeat
1333         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1334
1335         assertEquals(-1, appendEntries.getLeaderCommit());
1336         assertEquals(0, appendEntries.getEntries().size());
1337         assertEquals(0, appendEntries.getPrevLogIndex());
1338
1339         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1340                 leaderActor, AppendEntriesReply.class);
1341
1342         assertEquals(2, appendEntriesReply.getLogLastIndex());
1343         assertEquals(1, appendEntriesReply.getLogLastTerm());
1344
1345         leaderActor.underlyingActor().setBehavior(follower);
1346         leader.handleMessage(followerActor, appendEntriesReply);
1347
1348         leaderActor.underlyingActor().clear();
1349         followerActor.underlyingActor().clear();
1350
1351         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1352                 TimeUnit.MILLISECONDS);
1353
1354         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1355
1356         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1357
1358         assertEquals(2, appendEntries.getLeaderCommit());
1359         assertEquals(0, appendEntries.getEntries().size());
1360         assertEquals(2, appendEntries.getPrevLogIndex());
1361
1362         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1363
1364         assertEquals(2, appendEntriesReply.getLogLastIndex());
1365         assertEquals(1, appendEntriesReply.getLogLastTerm());
1366
1367         assertEquals(2, followerActorContext.getCommitIndex());
1368
1369         follower.close();
1370     }
1371
1372     @Test
1373     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1374         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1375
1376         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1377         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1378                 new FiniteDuration(1000, TimeUnit.SECONDS));
1379
1380         leaderActorContext.setReplicatedLog(
1381                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1382         long leaderCommitIndex = 2;
1383         leaderActorContext.setCommitIndex(leaderCommitIndex);
1384         leaderActorContext.setLastApplied(leaderCommitIndex);
1385
1386         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1387         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1388
1389         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1390
1391         followerActorContext.setReplicatedLog(
1392                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1393         followerActorContext.setCommitIndex(0);
1394         followerActorContext.setLastApplied(0);
1395
1396         Follower follower = new Follower(followerActorContext);
1397         followerActor.underlyingActor().setBehavior(follower);
1398
1399         leader = new Leader(leaderActorContext);
1400
1401         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1402         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1403                 AppendEntriesReply.class);
1404
1405         MessageCollectorActor.clearMessages(followerActor);
1406         MessageCollectorActor.clearMessages(leaderActor);
1407
1408         // Verify initial AppendEntries sent.
1409         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1410         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1411         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1412
1413         leaderActor.underlyingActor().setBehavior(leader);
1414
1415         leader.handleMessage(followerActor, appendEntriesReply);
1416
1417         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1418         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1419
1420         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1421         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1422         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1423
1424         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1425         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1426                 appendEntries.getEntries().get(0).getData());
1427         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1428         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1429                 appendEntries.getEntries().get(1).getData());
1430
1431         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1432         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1433
1434         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1435
1436         ApplyState applyState = applyStateList.get(0);
1437         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1438         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1439         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1440                 applyState.getReplicatedLogEntry().getData());
1441
1442         applyState = applyStateList.get(1);
1443         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1444         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1445         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1446                 applyState.getReplicatedLogEntry().getData());
1447
1448         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1449         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1450     }
1451
1452     @Test
1453     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1454         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1455
1456         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1457         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1458                 new FiniteDuration(1000, TimeUnit.SECONDS));
1459
1460         leaderActorContext.setReplicatedLog(
1461                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1462         long leaderCommitIndex = 1;
1463         leaderActorContext.setCommitIndex(leaderCommitIndex);
1464         leaderActorContext.setLastApplied(leaderCommitIndex);
1465
1466         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1467         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1468
1469         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1470
1471         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1472         followerActorContext.setCommitIndex(-1);
1473         followerActorContext.setLastApplied(-1);
1474
1475         Follower follower = new Follower(followerActorContext);
1476         followerActor.underlyingActor().setBehavior(follower);
1477         followerActorContext.setCurrentBehavior(follower);
1478
1479         leader = new Leader(leaderActorContext);
1480
1481         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1482         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1483                 AppendEntriesReply.class);
1484
1485         MessageCollectorActor.clearMessages(followerActor);
1486         MessageCollectorActor.clearMessages(leaderActor);
1487
1488         // Verify initial AppendEntries sent with the leader's current commit index.
1489         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1490         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1491         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1492
1493         leaderActor.underlyingActor().setBehavior(leader);
1494         leaderActorContext.setCurrentBehavior(leader);
1495
1496         leader.handleMessage(followerActor, appendEntriesReply);
1497
1498         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1499         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1500
1501         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1502         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1503         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1504
1505         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1506         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1507                 appendEntries.getEntries().get(0).getData());
1508         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1509         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1510                 appendEntries.getEntries().get(1).getData());
1511
1512         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1513         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1514
1515         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1516
1517         ApplyState applyState = applyStateList.get(0);
1518         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1519         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1520         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1521                 applyState.getReplicatedLogEntry().getData());
1522
1523         applyState = applyStateList.get(1);
1524         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1525         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1526         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1527                 applyState.getReplicatedLogEntry().getData());
1528
1529         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1530         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1531     }
1532
1533     @Test
1534     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1535         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1536
1537         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1538         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1539                 new FiniteDuration(1000, TimeUnit.SECONDS));
1540
1541         leaderActorContext.setReplicatedLog(
1542                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1543         long leaderCommitIndex = 1;
1544         leaderActorContext.setCommitIndex(leaderCommitIndex);
1545         leaderActorContext.setLastApplied(leaderCommitIndex);
1546
1547         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1548         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1549
1550         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1551
1552         followerActorContext.setReplicatedLog(
1553                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1554         followerActorContext.setCommitIndex(-1);
1555         followerActorContext.setLastApplied(-1);
1556
1557         Follower follower = new Follower(followerActorContext);
1558         followerActor.underlyingActor().setBehavior(follower);
1559         followerActorContext.setCurrentBehavior(follower);
1560
1561         leader = new Leader(leaderActorContext);
1562
1563         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1564         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1565                 AppendEntriesReply.class);
1566
1567         MessageCollectorActor.clearMessages(followerActor);
1568         MessageCollectorActor.clearMessages(leaderActor);
1569
1570         // Verify initial AppendEntries sent with the leader's current commit index.
1571         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1572         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1573         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1574
1575         leaderActor.underlyingActor().setBehavior(leader);
1576         leaderActorContext.setCurrentBehavior(leader);
1577
1578         leader.handleMessage(followerActor, appendEntriesReply);
1579
1580         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1581         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1582
1583         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1584         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1585         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1586
1587         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1588         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1589         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1590                 appendEntries.getEntries().get(0).getData());
1591         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1592         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1593         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1594                 appendEntries.getEntries().get(1).getData());
1595
1596         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1597         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1598
1599         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1600
1601         ApplyState applyState = applyStateList.get(0);
1602         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1603         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1604         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1605                 applyState.getReplicatedLogEntry().getData());
1606
1607         applyState = applyStateList.get(1);
1608         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1609         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1610         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1611                 applyState.getReplicatedLogEntry().getData());
1612
1613         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1614         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1615         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1616     }
1617
1618     @Test
1619     public void testHandleAppendEntriesReplyWithNewerTerm() {
1620         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1621
1622         MockRaftActorContext leaderActorContext = createActorContext();
1623         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1624                 new FiniteDuration(10000, TimeUnit.SECONDS));
1625
1626         leaderActorContext.setReplicatedLog(
1627                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1628
1629         leader = new Leader(leaderActorContext);
1630         leaderActor.underlyingActor().setBehavior(leader);
1631         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1632
1633         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1634                 AppendEntriesReply.class);
1635
1636         assertEquals(false, appendEntriesReply.isSuccess());
1637         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1638
1639         MessageCollectorActor.clearMessages(leaderActor);
1640     }
1641
1642     @Test
1643     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1644         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1645
1646         MockRaftActorContext leaderActorContext = createActorContext();
1647         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1648                 new FiniteDuration(10000, TimeUnit.SECONDS));
1649
1650         leaderActorContext.setReplicatedLog(
1651                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1652         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1653
1654         leader = new Leader(leaderActorContext);
1655         leaderActor.underlyingActor().setBehavior(leader);
1656         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1657
1658         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1659                 AppendEntriesReply.class);
1660
1661         assertEquals(false, appendEntriesReply.isSuccess());
1662         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1663
1664         MessageCollectorActor.clearMessages(leaderActor);
1665     }
1666
1667     @Test
1668     public void testHandleAppendEntriesReplySuccess() throws Exception {
1669         logStart("testHandleAppendEntriesReplySuccess");
1670
1671         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1672
1673         leaderActorContext.setReplicatedLog(
1674                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1675
1676         leaderActorContext.setCommitIndex(1);
1677         leaderActorContext.setLastApplied(1);
1678         leaderActorContext.getTermInformation().update(1, "leader");
1679
1680         leader = new Leader(leaderActorContext);
1681
1682         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1683
1684         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1685         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1686
1687         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1688
1689         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1690
1691         assertEquals(RaftState.Leader, raftActorBehavior.state());
1692
1693         assertEquals(2, leaderActorContext.getCommitIndex());
1694
1695         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1696                 leaderActor, ApplyJournalEntries.class);
1697
1698         assertEquals(2, leaderActorContext.getLastApplied());
1699
1700         assertEquals(2, applyJournalEntries.getToIndex());
1701
1702         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1703                 ApplyState.class);
1704
1705         assertEquals(1,applyStateList.size());
1706
1707         ApplyState applyState = applyStateList.get(0);
1708
1709         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1710
1711         assertEquals(2, followerInfo.getMatchIndex());
1712         assertEquals(3, followerInfo.getNextIndex());
1713         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1714         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1715     }
1716
1717     @Test
1718     public void testHandleAppendEntriesReplyUnknownFollower() {
1719         logStart("testHandleAppendEntriesReplyUnknownFollower");
1720
1721         MockRaftActorContext leaderActorContext = createActorContext();
1722
1723         leader = new Leader(leaderActorContext);
1724
1725         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1726
1727         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1728
1729         assertEquals(RaftState.Leader, raftActorBehavior.state());
1730     }
1731
1732     @Test
1733     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1734         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1735
1736         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1737         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1738                 new FiniteDuration(1000, TimeUnit.SECONDS));
1739         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1740
1741         leaderActorContext.setReplicatedLog(
1742                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1743         long leaderCommitIndex = 3;
1744         leaderActorContext.setCommitIndex(leaderCommitIndex);
1745         leaderActorContext.setLastApplied(leaderCommitIndex);
1746
1747         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1748         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1749         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1750         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1751
1752         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1753
1754         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1755         followerActorContext.setCommitIndex(-1);
1756         followerActorContext.setLastApplied(-1);
1757
1758         Follower follower = new Follower(followerActorContext);
1759         followerActor.underlyingActor().setBehavior(follower);
1760         followerActorContext.setCurrentBehavior(follower);
1761
1762         leader = new Leader(leaderActorContext);
1763
1764         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1765         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1766                 AppendEntriesReply.class);
1767
1768         MessageCollectorActor.clearMessages(followerActor);
1769         MessageCollectorActor.clearMessages(leaderActor);
1770
1771         // Verify initial AppendEntries sent with the leader's current commit index.
1772         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1773         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1774         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1775
1776         leaderActor.underlyingActor().setBehavior(leader);
1777         leaderActorContext.setCurrentBehavior(leader);
1778
1779         leader.handleMessage(followerActor, appendEntriesReply);
1780
1781         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1782                 AppendEntries.class, 2);
1783         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1784
1785         appendEntries = appendEntriesList.get(0);
1786         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1787         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1788         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1789
1790         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1791         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1792                 appendEntries.getEntries().get(0).getData());
1793         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1794         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1795                 appendEntries.getEntries().get(1).getData());
1796
1797         appendEntries = appendEntriesList.get(1);
1798         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1799         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1800         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1801
1802         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1803         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1804                 appendEntries.getEntries().get(0).getData());
1805         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1806         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1807                 appendEntries.getEntries().get(1).getData());
1808
1809         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1810         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1811
1812         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1813
1814         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1815         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1816     }
1817
1818     @Test
1819     public void testHandleRequestVoteReply() {
1820         logStart("testHandleRequestVoteReply");
1821
1822         MockRaftActorContext leaderActorContext = createActorContext();
1823
1824         leader = new Leader(leaderActorContext);
1825
1826         // Should be a no-op.
1827         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1828                 new RequestVoteReply(1, true));
1829
1830         assertEquals(RaftState.Leader, raftActorBehavior.state());
1831
1832         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1833
1834         assertEquals(RaftState.Leader, raftActorBehavior.state());
1835     }
1836
1837     @Test
1838     public void testIsolatedLeaderCheckNoFollowers() {
1839         logStart("testIsolatedLeaderCheckNoFollowers");
1840
1841         MockRaftActorContext leaderActorContext = createActorContext();
1842
1843         leader = new Leader(leaderActorContext);
1844         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1845         assertTrue(newBehavior instanceof Leader);
1846     }
1847
1848     @Test
1849     public void testIsolatedLeaderCheckNoVotingFollowers() {
1850         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1851
1852         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1853         Follower follower = new Follower(followerActorContext);
1854         followerActor.underlyingActor().setBehavior(follower);
1855
1856         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1857         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1858                 new FiniteDuration(1000, TimeUnit.SECONDS));
1859         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1860
1861         leader = new Leader(leaderActorContext);
1862         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1863         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1864         assertTrue("Expected Leader", newBehavior instanceof Leader);
1865     }
1866
1867     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1868         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1869         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1870
1871         MockRaftActorContext leaderActorContext = createActorContext();
1872
1873         Map<String, String> peerAddresses = new HashMap<>();
1874         peerAddresses.put("follower-1", followerActor1.path().toString());
1875         peerAddresses.put("follower-2", followerActor2.path().toString());
1876
1877         leaderActorContext.setPeerAddresses(peerAddresses);
1878         leaderActorContext.setRaftPolicy(raftPolicy);
1879
1880         leader = new Leader(leaderActorContext);
1881
1882         leader.markFollowerActive("follower-1");
1883         leader.markFollowerActive("follower-2");
1884         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1885         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1886
1887         // kill 1 follower and verify if that got killed
1888         final JavaTestKit probe = new JavaTestKit(getSystem());
1889         probe.watch(followerActor1);
1890         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1891         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1892         assertEquals(termMsg1.getActor(), followerActor1);
1893
1894         leader.markFollowerInActive("follower-1");
1895         leader.markFollowerActive("follower-2");
1896         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1897         assertTrue("Behavior not instance of Leader when majority of followers are active",
1898                 newBehavior instanceof Leader);
1899
1900         // kill 2nd follower and leader should change to Isolated leader
1901         followerActor2.tell(PoisonPill.getInstance(), null);
1902         probe.watch(followerActor2);
1903         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1904         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1905         assertEquals(termMsg2.getActor(), followerActor2);
1906
1907         leader.markFollowerInActive("follower-2");
1908         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1909     }
1910
1911     @Test
1912     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1913         logStart("testIsolatedLeaderCheckTwoFollowers");
1914
1915         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1916
1917         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1918             newBehavior instanceof IsolatedLeader);
1919     }
1920
1921     @Test
1922     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1923         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1924
1925         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1926
1927         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1928                 newBehavior instanceof Leader);
1929     }
1930
1931     @Test
1932     public void testLaggingFollowerStarvation() throws Exception {
1933         logStart("testLaggingFollowerStarvation");
1934
1935         String leaderActorId = actorFactory.generateActorId("leader");
1936         String follower1ActorId = actorFactory.generateActorId("follower");
1937         String follower2ActorId = actorFactory.generateActorId("follower");
1938
1939         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1940         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1941
1942         MockRaftActorContext leaderActorContext =
1943                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1944
1945         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1946         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1947         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1948
1949         leaderActorContext.setConfigParams(configParams);
1950
1951         leaderActorContext.setReplicatedLog(
1952                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1953
1954         Map<String, String> peerAddresses = new HashMap<>();
1955         peerAddresses.put(follower1ActorId,
1956                 follower1Actor.path().toString());
1957         peerAddresses.put(follower2ActorId,
1958                 follower2Actor.path().toString());
1959
1960         leaderActorContext.setPeerAddresses(peerAddresses);
1961         leaderActorContext.getTermInformation().update(1, leaderActorId);
1962
1963         leader = createBehavior(leaderActorContext);
1964
1965         leaderActor.underlyingActor().setBehavior(leader);
1966
1967         for (int i = 1; i < 6; i++) {
1968             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1969             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1970                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1971             assertTrue(newBehavior == leader);
1972             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1973         }
1974
1975         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1976         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1977
1978         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1979                 heartbeats.size() > 1);
1980
1981         // Check if follower-2 got AppendEntries during this time and was not starved
1982         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1983
1984         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1985                 appendEntries.size() > 1);
1986     }
1987
1988     @Test
1989     public void testReplicationConsensusWithNonVotingFollower() {
1990         logStart("testReplicationConsensusWithNonVotingFollower");
1991
1992         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1993         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1994                 new FiniteDuration(1000, TimeUnit.SECONDS));
1995
1996         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1997         leaderActorContext.setCommitIndex(-1);
1998         leaderActorContext.setLastApplied(-1);
1999
2000         String nonVotingFollowerId = "nonvoting-follower";
2001         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
2002                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
2003
2004         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2005                 VotingState.NON_VOTING);
2006
2007         leader = new Leader(leaderActorContext);
2008         leaderActorContext.setCurrentBehavior(leader);
2009
2010         // Ignore initial heartbeats
2011         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2012         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2013
2014         MessageCollectorActor.clearMessages(followerActor);
2015         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2016         MessageCollectorActor.clearMessages(leaderActor);
2017
2018         // Send a Replicate message and wait for AppendEntries.
2019         sendReplicate(leaderActorContext, 0);
2020
2021         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2022         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2023
2024         // Send reply only from the voting follower and verify consensus via ApplyState.
2025         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2026
2027         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2028
2029         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2030
2031         MessageCollectorActor.clearMessages(followerActor);
2032         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2033         MessageCollectorActor.clearMessages(leaderActor);
2034
2035         // Send another Replicate message
2036         sendReplicate(leaderActorContext, 1);
2037
2038         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2039         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2040                 AppendEntries.class);
2041         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2042         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2043
2044         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2045         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2046
2047         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2048
2049         // Send reply from the voting follower and verify consensus.
2050         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2051
2052         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2053     }
2054
2055     @Test
2056     public void testTransferLeadershipWithFollowerInSync() {
2057         logStart("testTransferLeadershipWithFollowerInSync");
2058
2059         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2060         leaderActorContext.setLastApplied(-1);
2061         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2062                 new FiniteDuration(1000, TimeUnit.SECONDS));
2063         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2064
2065         leader = new Leader(leaderActorContext);
2066         leaderActorContext.setCurrentBehavior(leader);
2067
2068         // Initial heartbeat
2069         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2070         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2071         MessageCollectorActor.clearMessages(followerActor);
2072
2073         sendReplicate(leaderActorContext, 0);
2074         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2075
2076         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2077         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2078         MessageCollectorActor.clearMessages(followerActor);
2079
2080         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2081         leader.transferLeadership(mockTransferCohort);
2082
2083         verify(mockTransferCohort, never()).transferComplete();
2084         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2085         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2086         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2087
2088         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2089         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2090
2091         // Leader should force an election timeout
2092         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2093
2094         verify(mockTransferCohort).transferComplete();
2095     }
2096
2097     @Test
2098     public void testTransferLeadershipWithEmptyLog() {
2099         logStart("testTransferLeadershipWithEmptyLog");
2100
2101         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2102         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2103                 new FiniteDuration(1000, TimeUnit.SECONDS));
2104         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2105
2106         leader = new Leader(leaderActorContext);
2107         leaderActorContext.setCurrentBehavior(leader);
2108
2109         // Initial heartbeat
2110         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2111         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2112         MessageCollectorActor.clearMessages(followerActor);
2113
2114         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2115         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2116         leader.transferLeadership(mockTransferCohort);
2117
2118         verify(mockTransferCohort, never()).transferComplete();
2119         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2120         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2121
2122         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2123         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2124
2125         // Leader should force an election timeout
2126         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2127
2128         verify(mockTransferCohort).transferComplete();
2129     }
2130
2131     @Test
2132     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2133         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2134
2135         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2136         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2137                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2138
2139         leader = new Leader(leaderActorContext);
2140         leaderActorContext.setCurrentBehavior(leader);
2141
2142         // Initial heartbeat
2143         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2144         MessageCollectorActor.clearMessages(followerActor);
2145
2146         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2147         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2148         leader.transferLeadership(mockTransferCohort);
2149
2150         verify(mockTransferCohort, never()).transferComplete();
2151
2152         // Sync up the follower.
2153         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2155         MessageCollectorActor.clearMessages(followerActor);
2156
2157         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2158                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2159         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2160         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2161         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2162
2163         // Leader should force an election timeout
2164         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2165
2166         verify(mockTransferCohort).transferComplete();
2167     }
2168
2169     @Test
2170     public void testTransferLeadershipWithFollowerSyncTimeout() {
2171         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2172
2173         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2174         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2175                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2176         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2177         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2178
2179         leader = new Leader(leaderActorContext);
2180         leaderActorContext.setCurrentBehavior(leader);
2181
2182         // Initial heartbeat
2183         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2184         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2185         MessageCollectorActor.clearMessages(followerActor);
2186
2187         sendReplicate(leaderActorContext, 0);
2188         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2189
2190         MessageCollectorActor.clearMessages(followerActor);
2191
2192         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2193         leader.transferLeadership(mockTransferCohort);
2194
2195         verify(mockTransferCohort, never()).transferComplete();
2196
2197         // Send heartbeats to time out the transfer.
2198         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2199             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2200                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2201             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2202         }
2203
2204         verify(mockTransferCohort).abortTransfer();
2205         verify(mockTransferCohort, never()).transferComplete();
2206         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2207     }
2208
2209     @Override
2210     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2211             ActorRef actorRef, RaftRPC rpc) throws Exception {
2212         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2213         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2214     }
2215
2216     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2217
2218         private final long electionTimeOutIntervalMillis;
2219         private final int snapshotChunkSize;
2220
2221         MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2222             super();
2223             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2224             this.snapshotChunkSize = snapshotChunkSize;
2225         }
2226
2227         @Override
2228         public FiniteDuration getElectionTimeOutInterval() {
2229             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2230         }
2231
2232         @Override
2233         public int getSnapshotChunkSize() {
2234             return snapshotChunkSize;
2235         }
2236     }
2237 }