Change ReplicatedLogImplEntry to Externalizable proxy pattern
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.google.protobuf.ByteString;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.TimeUnit;
36 import org.junit.After;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
39 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
40 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
41 import org.opendaylight.controller.cluster.raft.RaftActorContext;
42 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
43 import org.opendaylight.controller.cluster.raft.RaftState;
44 import org.opendaylight.controller.cluster.raft.RaftVersions;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.Snapshot;
47 import org.opendaylight.controller.cluster.raft.VotingState;
48 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
63 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
64 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
65 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.yangtools.concepts.Identifier;
68 import scala.concurrent.duration.FiniteDuration;
69
70 public class LeaderTest extends AbstractLeaderTest<Leader> {
71
72     static final String FOLLOWER_ID = "follower";
73     public static final String LEADER_ID = "leader";
74
75     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
76             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
77
78     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
79             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
80
81     private Leader leader;
82     private final short payloadVersion = 5;
83
84     @Override
85     @After
86     public void tearDown() throws Exception {
87         if (leader != null) {
88             leader.close();
89         }
90
91         super.tearDown();
92     }
93
94     @Test
95     public void testHandleMessageForUnknownMessage() throws Exception {
96         logStart("testHandleMessageForUnknownMessage");
97
98         leader = new Leader(createActorContext());
99
100         // handle message should null when it receives an unknown message
101         assertNull(leader.handleMessage(followerActor, "foo"));
102     }
103
104     @Test
105     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
106         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
107
108         MockRaftActorContext actorContext = createActorContextWithFollower();
109         actorContext.setCommitIndex(-1);
110         actorContext.setPayloadVersion(payloadVersion);
111
112         long term = 1;
113         actorContext.getTermInformation().update(term, "");
114
115         leader = new Leader(actorContext);
116         actorContext.setCurrentBehavior(leader);
117
118         // Leader should send an immediate heartbeat with no entries as follower is inactive.
119         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
120         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getTerm", term, appendEntries.getTerm());
122         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
123         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
124         assertEquals("Entries size", 0, appendEntries.getEntries().size());
125         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
126
127         // The follower would normally reply - simulate that explicitly here.
128         leader.handleMessage(followerActor, new AppendEntriesReply(
129                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
130         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
131
132         followerActor.underlyingActor().clear();
133
134         // Sleep for the heartbeat interval so AppendEntries is sent.
135         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
136                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
137
138         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
139
140         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
141         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
142         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
143         assertEquals("Entries size", 1, appendEntries.getEntries().size());
144         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
145         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
146         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
147     }
148
149
150     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
151         return sendReplicate(actorContext, 1, index);
152     }
153
154     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
155         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
156         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
157                 term, index, payload);
158         actorContext.getReplicatedLog().append(newEntry);
159         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
160     }
161
162     @Test
163     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
164         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
165
166         MockRaftActorContext actorContext = createActorContextWithFollower();
167
168         long term = 1;
169         actorContext.getTermInformation().update(term, "");
170
171         leader = new Leader(actorContext);
172
173         // Leader will send an immediate heartbeat - ignore it.
174         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
175
176         // The follower would normally reply - simulate that explicitly here.
177         long lastIndex = actorContext.getReplicatedLog().lastIndex();
178         leader.handleMessage(followerActor, new AppendEntriesReply(
179                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
180         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
181
182         followerActor.underlyingActor().clear();
183
184         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
185
186         // State should not change
187         assertTrue(raftBehavior instanceof Leader);
188
189         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
190         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
191         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
192         assertEquals("Entries size", 1, appendEntries.getEntries().size());
193         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
194         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
195         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
196         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
197     }
198
199     @Test
200     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
201         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
202
203         MockRaftActorContext actorContext = createActorContextWithFollower();
204         actorContext.setCommitIndex(-1);
205         actorContext.setLastApplied(-1);
206
207         // The raft context is initialized with a couple log entries. However the commitIndex
208         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
209         // committed and applied. Now it regains leadership with a higher term (2).
210         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
211         long newTerm = prevTerm + 1;
212         actorContext.getTermInformation().update(newTerm, "");
213
214         leader = new Leader(actorContext);
215         actorContext.setCurrentBehavior(leader);
216
217         // Leader will send an immediate heartbeat - ignore it.
218         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
219
220         // The follower replies with the leader's current last index and term, simulating that it is
221         // up to date with the leader.
222         long lastIndex = actorContext.getReplicatedLog().lastIndex();
223         leader.handleMessage(followerActor, new AppendEntriesReply(
224                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
225
226         // The commit index should not get updated even though consensus was reached. This is b/c the
227         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
228         // from previous terms by counting replicas".
229         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
230
231         followerActor.underlyingActor().clear();
232
233         // Now replicate a new entry with the new term 2.
234         long newIndex = lastIndex + 1;
235         sendReplicate(actorContext, newTerm, newIndex);
236
237         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
238         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
239         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
240         assertEquals("Entries size", 1, appendEntries.getEntries().size());
241         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
242         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
243         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
244
245         // The follower replies with success. The leader should now update the commit index to the new index
246         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
247         // prior entries are committed indirectly".
248         leader.handleMessage(followerActor, new AppendEntriesReply(
249                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
250
251         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
252     }
253
254     @Test
255     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
256         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
257
258         MockRaftActorContext actorContext = createActorContextWithFollower();
259         actorContext.setRaftPolicy(createRaftPolicy(true, true));
260
261         long term = 1;
262         actorContext.getTermInformation().update(term, "");
263
264         leader = new Leader(actorContext);
265
266         // Leader will send an immediate heartbeat - ignore it.
267         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
268
269         // The follower would normally reply - simulate that explicitly here.
270         long lastIndex = actorContext.getReplicatedLog().lastIndex();
271         leader.handleMessage(followerActor, new AppendEntriesReply(
272                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
273         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
274
275         followerActor.underlyingActor().clear();
276
277         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
278
279         // State should not change
280         assertTrue(raftBehavior instanceof Leader);
281
282         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
284         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
285         assertEquals("Entries size", 1, appendEntries.getEntries().size());
286         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
287         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
288         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
289         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
290     }
291
292     @Test
293     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
294         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
295
296         MockRaftActorContext actorContext = createActorContextWithFollower();
297         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
298             @Override
299             public FiniteDuration getHeartBeatInterval() {
300                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
301             }
302         });
303
304         long term = 1;
305         actorContext.getTermInformation().update(term, "");
306
307         leader = new Leader(actorContext);
308
309         // Leader will send an immediate heartbeat - ignore it.
310         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
311
312         // The follower would normally reply - simulate that explicitly here.
313         long lastIndex = actorContext.getReplicatedLog().lastIndex();
314         leader.handleMessage(followerActor, new AppendEntriesReply(
315                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
316         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
317
318         followerActor.underlyingActor().clear();
319
320         for (int i = 0; i < 5; i++) {
321             sendReplicate(actorContext, lastIndex + i + 1);
322         }
323
324         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
325         // We expect only 1 message to be sent because of two reasons,
326         // - an append entries reply was not received
327         // - the heartbeat interval has not expired
328         // In this scenario if multiple messages are sent they would likely be duplicates
329         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
330     }
331
332     @Test
333     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
334         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
335
336         MockRaftActorContext actorContext = createActorContextWithFollower();
337         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
338             @Override
339             public FiniteDuration getHeartBeatInterval() {
340                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
341             }
342         });
343
344         long term = 1;
345         actorContext.getTermInformation().update(term, "");
346
347         leader = new Leader(actorContext);
348
349         // Leader will send an immediate heartbeat - ignore it.
350         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
351
352         // The follower would normally reply - simulate that explicitly here.
353         long lastIndex = actorContext.getReplicatedLog().lastIndex();
354         leader.handleMessage(followerActor, new AppendEntriesReply(
355                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
356         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
357
358         followerActor.underlyingActor().clear();
359
360         for (int i = 0; i < 3; i++) {
361             sendReplicate(actorContext, lastIndex + i + 1);
362             leader.handleMessage(followerActor, new AppendEntriesReply(
363                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
364
365         }
366
367         for (int i = 3; i < 5; i++) {
368             sendReplicate(actorContext, lastIndex + i + 1);
369         }
370
371         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
372         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
373         // get sent to the follower - but not the 5th
374         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
375
376         for (int i = 0; i < 4; i++) {
377             long expected = allMessages.get(i).getEntries().get(0).getIndex();
378             assertEquals(expected, i + 2);
379         }
380     }
381
382     @Test
383     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
384         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
385
386         MockRaftActorContext actorContext = createActorContextWithFollower();
387         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
388             @Override
389             public FiniteDuration getHeartBeatInterval() {
390                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
391             }
392         });
393
394         long term = 1;
395         actorContext.getTermInformation().update(term, "");
396
397         leader = new Leader(actorContext);
398
399         // Leader will send an immediate heartbeat - ignore it.
400         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
401
402         // The follower would normally reply - simulate that explicitly here.
403         long lastIndex = actorContext.getReplicatedLog().lastIndex();
404         leader.handleMessage(followerActor, new AppendEntriesReply(
405                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
406         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
407
408         followerActor.underlyingActor().clear();
409
410         sendReplicate(actorContext, lastIndex + 1);
411
412         // Wait slightly longer than heartbeat duration
413         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
414
415         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
416
417         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
418         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
419
420         assertEquals(1, allMessages.get(0).getEntries().size());
421         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
422         assertEquals(1, allMessages.get(1).getEntries().size());
423         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
424
425     }
426
427     @Test
428     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
429         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
430
431         MockRaftActorContext actorContext = createActorContextWithFollower();
432         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
433             @Override
434             public FiniteDuration getHeartBeatInterval() {
435                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
436             }
437         });
438
439         long term = 1;
440         actorContext.getTermInformation().update(term, "");
441
442         leader = new Leader(actorContext);
443
444         // Leader will send an immediate heartbeat - ignore it.
445         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
446
447         // The follower would normally reply - simulate that explicitly here.
448         long lastIndex = actorContext.getReplicatedLog().lastIndex();
449         leader.handleMessage(followerActor, new AppendEntriesReply(
450                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
451         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
452
453         followerActor.underlyingActor().clear();
454
455         for (int i = 0; i < 3; i++) {
456             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
457             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
458         }
459
460         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
461         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
462     }
463
464     @Test
465     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
466         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
467
468         MockRaftActorContext actorContext = createActorContextWithFollower();
469         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
470             @Override
471             public FiniteDuration getHeartBeatInterval() {
472                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
473             }
474         });
475
476         long term = 1;
477         actorContext.getTermInformation().update(term, "");
478
479         leader = new Leader(actorContext);
480
481         // Leader will send an immediate heartbeat - ignore it.
482         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
483
484         // The follower would normally reply - simulate that explicitly here.
485         long lastIndex = actorContext.getReplicatedLog().lastIndex();
486         leader.handleMessage(followerActor, new AppendEntriesReply(
487                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
488         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
489
490         followerActor.underlyingActor().clear();
491
492         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
493         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
494         sendReplicate(actorContext, lastIndex + 1);
495
496         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
497         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
498
499         assertEquals(0, allMessages.get(0).getEntries().size());
500         assertEquals(1, allMessages.get(1).getEntries().size());
501     }
502
503
504     @Test
505     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
506         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
507
508         MockRaftActorContext actorContext = createActorContext();
509
510         leader = new Leader(actorContext);
511
512         actorContext.setLastApplied(0);
513
514         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
515         long term = actorContext.getTermInformation().getCurrentTerm();
516         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
517                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
518
519         actorContext.getReplicatedLog().append(newEntry);
520
521         final Identifier id = new MockIdentifier("state-id");
522         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
523
524         // State should not change
525         assertTrue(raftBehavior instanceof Leader);
526
527         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
528
529         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
530         // one since lastApplied state is 0.
531         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
532                 leaderActor, ApplyState.class);
533         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
534
535         for (int i = 0; i <= newLogIndex - 1; i++ ) {
536             ApplyState applyState = applyStateList.get(i);
537             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
538             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
539         }
540
541         ApplyState last = applyStateList.get((int) newLogIndex - 1);
542         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
543         assertEquals("getIdentifier", id, last.getIdentifier());
544     }
545
546     @Test
547     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
548         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
549
550         final MockRaftActorContext actorContext = createActorContextWithFollower();
551
552         Map<String, String> leadersSnapshot = new HashMap<>();
553         leadersSnapshot.put("1", "A");
554         leadersSnapshot.put("2", "B");
555         leadersSnapshot.put("3", "C");
556
557         //clears leaders log
558         actorContext.getReplicatedLog().removeFrom(0);
559
560         final int commitIndex = 3;
561         final int snapshotIndex = 2;
562         final int snapshotTerm = 1;
563
564         // set the snapshot variables in replicatedlog
565         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567         actorContext.setCommitIndex(commitIndex);
568         //set follower timeout to 2 mins, helps during debugging
569         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
570
571         leader = new Leader(actorContext);
572
573         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
574         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
575
576         //update follower timestamp
577         leader.markFollowerActive(FOLLOWER_ID);
578
579         ByteString bs = toByteString(leadersSnapshot);
580         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
581                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
582         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
583                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
584         fts.setSnapshotBytes(bs);
585         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
586
587         //send first chunk and no InstallSnapshotReply received yet
588         fts.getNextChunk();
589         fts.incrementChunkIndex();
590
591         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
592                 TimeUnit.MILLISECONDS);
593
594         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
595
596         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
597
598         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
599
600         //InstallSnapshotReply received
601         fts.markSendStatus(true);
602
603         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
604
605         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
606
607         assertEquals(commitIndex, is.getLastIncludedIndex());
608     }
609
610     @Test
611     public void testSendAppendEntriesSnapshotScenario() throws Exception {
612         logStart("testSendAppendEntriesSnapshotScenario");
613
614         final MockRaftActorContext actorContext = createActorContextWithFollower();
615
616         Map<String, String> leadersSnapshot = new HashMap<>();
617         leadersSnapshot.put("1", "A");
618         leadersSnapshot.put("2", "B");
619         leadersSnapshot.put("3", "C");
620
621         //clears leaders log
622         actorContext.getReplicatedLog().removeFrom(0);
623
624         final int followersLastIndex = 2;
625         final int snapshotIndex = 3;
626         final int newEntryIndex = 4;
627         final int snapshotTerm = 1;
628         final int currentTerm = 2;
629
630         // set the snapshot variables in replicatedlog
631         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
632         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
633         actorContext.setCommitIndex(followersLastIndex);
634
635         leader = new Leader(actorContext);
636
637         // Leader will send an immediate heartbeat - ignore it.
638         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
639
640         // new entry
641         SimpleReplicatedLogEntry entry =
642                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
643                         new MockRaftActorContext.MockPayload("D"));
644
645         actorContext.getReplicatedLog().append(entry);
646
647         //update follower timestamp
648         leader.markFollowerActive(FOLLOWER_ID);
649
650         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
651         RaftActorBehavior raftBehavior = leader.handleMessage(
652                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
653
654         assertTrue(raftBehavior instanceof Leader);
655
656         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
657     }
658
659     @Test
660     public void testInitiateInstallSnapshot() throws Exception {
661         logStart("testInitiateInstallSnapshot");
662
663         MockRaftActorContext actorContext = createActorContextWithFollower();
664
665         //clears leaders log
666         actorContext.getReplicatedLog().removeFrom(0);
667
668         final int followersLastIndex = 2;
669         final int snapshotIndex = 3;
670         final int newEntryIndex = 4;
671         final int snapshotTerm = 1;
672         final int currentTerm = 2;
673
674         // set the snapshot variables in replicatedlog
675         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
676         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
677         actorContext.setLastApplied(3);
678         actorContext.setCommitIndex(followersLastIndex);
679
680         leader = new Leader(actorContext);
681
682         // Leader will send an immediate heartbeat - ignore it.
683         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
684
685         // set the snapshot as absent and check if capture-snapshot is invoked.
686         leader.setSnapshot(null);
687
688         // new entry
689         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
690                 new MockRaftActorContext.MockPayload("D"));
691
692         actorContext.getReplicatedLog().append(entry);
693
694         //update follower timestamp
695         leader.markFollowerActive(FOLLOWER_ID);
696
697         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
698
699         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
700
701         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
702
703         assertTrue(cs.isInstallSnapshotInitiated());
704         assertEquals(3, cs.getLastAppliedIndex());
705         assertEquals(1, cs.getLastAppliedTerm());
706         assertEquals(4, cs.getLastIndex());
707         assertEquals(2, cs.getLastTerm());
708
709         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
710         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
711
712         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
713     }
714
715     @Test
716     public void testInitiateForceInstallSnapshot() throws Exception {
717         logStart("testInitiateForceInstallSnapshot");
718
719         MockRaftActorContext actorContext = createActorContextWithFollower();
720
721         final int followersLastIndex = 2;
722         final int snapshotIndex = -1;
723         final int newEntryIndex = 4;
724         final int snapshotTerm = -1;
725         final int currentTerm = 2;
726
727         // set the snapshot variables in replicatedlog
728         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
729         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
730         actorContext.setLastApplied(3);
731         actorContext.setCommitIndex(followersLastIndex);
732
733         actorContext.getReplicatedLog().removeFrom(0);
734
735         leader = new Leader(actorContext);
736         actorContext.setCurrentBehavior(leader);
737
738         // Leader will send an immediate heartbeat - ignore it.
739         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
740
741         // set the snapshot as absent and check if capture-snapshot is invoked.
742         leader.setSnapshot(null);
743
744         for (int i = 0; i < 4; i++) {
745             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
746                     new MockRaftActorContext.MockPayload("X" + i)));
747         }
748
749         // new entry
750         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
751                 new MockRaftActorContext.MockPayload("D"));
752
753         actorContext.getReplicatedLog().append(entry);
754
755         //update follower timestamp
756         leader.markFollowerActive(FOLLOWER_ID);
757
758         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
759         // installed with a SendInstallSnapshot
760         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
761
762         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
763
764         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
765
766         assertTrue(cs.isInstallSnapshotInitiated());
767         assertEquals(3, cs.getLastAppliedIndex());
768         assertEquals(1, cs.getLastAppliedTerm());
769         assertEquals(4, cs.getLastIndex());
770         assertEquals(2, cs.getLastTerm());
771
772         // if an initiate is started again when first is in progress, it should not initiate Capture
773         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
774
775         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
776     }
777
778
779     @Test
780     public void testInstallSnapshot() throws Exception {
781         logStart("testInstallSnapshot");
782
783         final MockRaftActorContext actorContext = createActorContextWithFollower();
784
785         Map<String, String> leadersSnapshot = new HashMap<>();
786         leadersSnapshot.put("1", "A");
787         leadersSnapshot.put("2", "B");
788         leadersSnapshot.put("3", "C");
789
790         //clears leaders log
791         actorContext.getReplicatedLog().removeFrom(0);
792
793         final int lastAppliedIndex = 3;
794         final int snapshotIndex = 2;
795         final int snapshotTerm = 1;
796         final int currentTerm = 2;
797
798         // set the snapshot variables in replicatedlog
799         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
800         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
801         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
802         actorContext.setCommitIndex(lastAppliedIndex);
803         actorContext.setLastApplied(lastAppliedIndex);
804
805         leader = new Leader(actorContext);
806
807         // Initial heartbeat.
808         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
809
810         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
811         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
812
813         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
814                 Collections.<ReplicatedLogEntry>emptyList(),
815                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
816
817         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
818
819         assertTrue(raftBehavior instanceof Leader);
820
821         // check if installsnapshot gets called with the correct values.
822
823         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
824                 InstallSnapshot.class);
825
826         assertNotNull(installSnapshot.getData());
827         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
828         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
829
830         assertEquals(currentTerm, installSnapshot.getTerm());
831     }
832
833     @Test
834     public void testForceInstallSnapshot() throws Exception {
835         logStart("testForceInstallSnapshot");
836
837         final MockRaftActorContext actorContext = createActorContextWithFollower();
838
839         Map<String, String> leadersSnapshot = new HashMap<>();
840         leadersSnapshot.put("1", "A");
841         leadersSnapshot.put("2", "B");
842         leadersSnapshot.put("3", "C");
843
844         final int lastAppliedIndex = 3;
845         final int snapshotIndex = -1;
846         final int snapshotTerm = -1;
847         final int currentTerm = 2;
848
849         // set the snapshot variables in replicatedlog
850         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
851         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
852         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
853         actorContext.setCommitIndex(lastAppliedIndex);
854         actorContext.setLastApplied(lastAppliedIndex);
855
856         leader = new Leader(actorContext);
857
858         // Initial heartbeat.
859         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
860
861         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
862         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
863
864         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
865                 Collections.<ReplicatedLogEntry>emptyList(),
866                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
867
868         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
869
870         assertTrue(raftBehavior instanceof Leader);
871
872         // check if installsnapshot gets called with the correct values.
873
874         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
875                 InstallSnapshot.class);
876
877         assertNotNull(installSnapshot.getData());
878         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
879         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
880
881         assertEquals(currentTerm, installSnapshot.getTerm());
882     }
883
884     @Test
885     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
886         logStart("testHandleInstallSnapshotReplyLastChunk");
887
888         MockRaftActorContext actorContext = createActorContextWithFollower();
889
890         final int commitIndex = 3;
891         final int snapshotIndex = 2;
892         final int snapshotTerm = 1;
893         final int currentTerm = 2;
894
895         actorContext.setCommitIndex(commitIndex);
896
897         leader = new Leader(actorContext);
898         actorContext.setCurrentBehavior(leader);
899
900         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
901         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
902
903         // Ignore initial heartbeat.
904         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
905
906         Map<String, String> leadersSnapshot = new HashMap<>();
907         leadersSnapshot.put("1", "A");
908         leadersSnapshot.put("2", "B");
909         leadersSnapshot.put("3", "C");
910
911         // set the snapshot variables in replicatedlog
912
913         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
914         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
915         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
916
917         ByteString bs = toByteString(leadersSnapshot);
918         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
919                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
920         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
921                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
922         fts.setSnapshotBytes(bs);
923         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
924         while (!fts.isLastChunk(fts.getChunkIndex())) {
925             fts.getNextChunk();
926             fts.incrementChunkIndex();
927         }
928
929         //clears leaders log
930         actorContext.getReplicatedLog().removeFrom(0);
931
932         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
933                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
934
935         assertTrue(raftBehavior instanceof Leader);
936
937         assertEquals(1, leader.followerLogSize());
938         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
939         assertNotNull(fli);
940         assertNull(fli.getInstallSnapshotState());
941         assertEquals(commitIndex, fli.getMatchIndex());
942         assertEquals(commitIndex + 1, fli.getNextIndex());
943         assertFalse(leader.hasSnapshot());
944     }
945
946     @Test
947     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
948         logStart("testSendSnapshotfromInstallSnapshotReply");
949
950         MockRaftActorContext actorContext = createActorContextWithFollower();
951
952         final int commitIndex = 3;
953         final int snapshotIndex = 2;
954         final int snapshotTerm = 1;
955         final int currentTerm = 2;
956
957         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
958             @Override
959             public int getSnapshotChunkSize() {
960                 return 50;
961             }
962         };
963         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
964         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
965
966         actorContext.setConfigParams(configParams);
967         actorContext.setCommitIndex(commitIndex);
968
969         leader = new Leader(actorContext);
970         actorContext.setCurrentBehavior(leader);
971
972         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
973         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
974
975         Map<String, String> leadersSnapshot = new HashMap<>();
976         leadersSnapshot.put("1", "A");
977         leadersSnapshot.put("2", "B");
978         leadersSnapshot.put("3", "C");
979
980         // set the snapshot variables in replicatedlog
981         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
982         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
983         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
984
985         ByteString bs = toByteString(leadersSnapshot);
986         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
987                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
988         leader.setSnapshot(snapshot);
989
990         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
991
992         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
993                 InstallSnapshot.class);
994
995         assertEquals(1, installSnapshot.getChunkIndex());
996         assertEquals(3, installSnapshot.getTotalChunks());
997
998         followerActor.underlyingActor().clear();
999         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1000                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1001
1002         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1003
1004         assertEquals(2, installSnapshot.getChunkIndex());
1005         assertEquals(3, installSnapshot.getTotalChunks());
1006
1007         followerActor.underlyingActor().clear();
1008         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1009                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1010
1011         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1012
1013         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1014         followerActor.underlyingActor().clear();
1015         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1016                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1017
1018         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1019
1020         assertNull(installSnapshot);
1021     }
1022
1023
1024     @Test
1025     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
1026         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1027
1028         MockRaftActorContext actorContext = createActorContextWithFollower();
1029
1030         final int commitIndex = 3;
1031         final int snapshotIndex = 2;
1032         final int snapshotTerm = 1;
1033         final int currentTerm = 2;
1034
1035         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1036             @Override
1037             public int getSnapshotChunkSize() {
1038                 return 50;
1039             }
1040         });
1041
1042         actorContext.setCommitIndex(commitIndex);
1043
1044         leader = new Leader(actorContext);
1045
1046         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1048
1049         Map<String, String> leadersSnapshot = new HashMap<>();
1050         leadersSnapshot.put("1", "A");
1051         leadersSnapshot.put("2", "B");
1052         leadersSnapshot.put("3", "C");
1053
1054         // set the snapshot variables in replicatedlog
1055         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1058
1059         ByteString bs = toByteString(leadersSnapshot);
1060         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062         leader.setSnapshot(snapshot);
1063
1064         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1065         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1066
1067         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1068                 InstallSnapshot.class);
1069
1070         assertEquals(1, installSnapshot.getChunkIndex());
1071         assertEquals(3, installSnapshot.getTotalChunks());
1072
1073         followerActor.underlyingActor().clear();
1074
1075         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1076                 FOLLOWER_ID, -1, false));
1077
1078         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1079                 TimeUnit.MILLISECONDS);
1080
1081         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1082
1083         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1084
1085         assertEquals(1, installSnapshot.getChunkIndex());
1086         assertEquals(3, installSnapshot.getTotalChunks());
1087     }
1088
1089     @Test
1090     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1091         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1092
1093         MockRaftActorContext actorContext = createActorContextWithFollower();
1094
1095         final int commitIndex = 3;
1096         final int snapshotIndex = 2;
1097         final int snapshotTerm = 1;
1098         final int currentTerm = 2;
1099
1100         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1101             @Override
1102             public int getSnapshotChunkSize() {
1103                 return 50;
1104             }
1105         });
1106
1107         actorContext.setCommitIndex(commitIndex);
1108
1109         leader = new Leader(actorContext);
1110
1111         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1112         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1113
1114         Map<String, String> leadersSnapshot = new HashMap<>();
1115         leadersSnapshot.put("1", "A");
1116         leadersSnapshot.put("2", "B");
1117         leadersSnapshot.put("3", "C");
1118
1119         // set the snapshot variables in replicatedlog
1120         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1121         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1122         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1123
1124         ByteString bs = toByteString(leadersSnapshot);
1125         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1126                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1127         leader.setSnapshot(snapshot);
1128
1129         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1130
1131         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1132                 InstallSnapshot.class);
1133
1134         assertEquals(1, installSnapshot.getChunkIndex());
1135         assertEquals(3, installSnapshot.getTotalChunks());
1136         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1137                 installSnapshot.getLastChunkHashCode().get().intValue());
1138
1139         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1140
1141         followerActor.underlyingActor().clear();
1142
1143         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1144                 FOLLOWER_ID, 1, true));
1145
1146         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1147
1148         assertEquals(2, installSnapshot.getChunkIndex());
1149         assertEquals(3, installSnapshot.getTotalChunks());
1150         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1151     }
1152
1153     @Test
1154     public void testLeaderInstallSnapshotState() {
1155         logStart("testLeaderInstallSnapshotState");
1156
1157         Map<String, String> leadersSnapshot = new HashMap<>();
1158         leadersSnapshot.put("1", "A");
1159         leadersSnapshot.put("2", "B");
1160         leadersSnapshot.put("3", "C");
1161
1162         ByteString bs = toByteString(leadersSnapshot);
1163         byte[] barray = bs.toByteArray();
1164
1165         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1166         fts.setSnapshotBytes(bs);
1167
1168         assertEquals(bs.size(), barray.length);
1169
1170         int chunkIndex = 0;
1171         for (int i = 0; i < barray.length; i = i + 50) {
1172             int length = i + 50;
1173             chunkIndex++;
1174
1175             if (i + 50 > barray.length) {
1176                 length = barray.length;
1177             }
1178
1179             byte[] chunk = fts.getNextChunk();
1180             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1181             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1182
1183             fts.markSendStatus(true);
1184             if (!fts.isLastChunk(chunkIndex)) {
1185                 fts.incrementChunkIndex();
1186             }
1187         }
1188
1189         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1190     }
1191
1192     @Override
1193     protected Leader createBehavior(final RaftActorContext actorContext) {
1194         return new Leader(actorContext);
1195     }
1196
1197     @Override
1198     protected MockRaftActorContext createActorContext() {
1199         return createActorContext(leaderActor);
1200     }
1201
1202     @Override
1203     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1204         return createActorContext(LEADER_ID, actorRef);
1205     }
1206
1207     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1208         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1209         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1210         configParams.setElectionTimeoutFactor(100000);
1211         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1212         context.setConfigParams(configParams);
1213         context.setPayloadVersion(payloadVersion);
1214         return context;
1215     }
1216
1217     private MockRaftActorContext createActorContextWithFollower() {
1218         MockRaftActorContext actorContext = createActorContext();
1219         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1220                 followerActor.path().toString()).build());
1221         return actorContext;
1222     }
1223
1224     private MockRaftActorContext createFollowerActorContextWithLeader() {
1225         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1226         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1227         followerConfig.setElectionTimeoutFactor(10000);
1228         followerActorContext.setConfigParams(followerConfig);
1229         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1230         return followerActorContext;
1231     }
1232
1233     @Test
1234     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1235         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1236
1237         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1238
1239         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1240
1241         Follower follower = new Follower(followerActorContext);
1242         followerActor.underlyingActor().setBehavior(follower);
1243         followerActorContext.setCurrentBehavior(follower);
1244
1245         Map<String, String> peerAddresses = new HashMap<>();
1246         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1247
1248         leaderActorContext.setPeerAddresses(peerAddresses);
1249
1250         leaderActorContext.getReplicatedLog().removeFrom(0);
1251
1252         //create 3 entries
1253         leaderActorContext.setReplicatedLog(
1254                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1255
1256         leaderActorContext.setCommitIndex(1);
1257
1258         followerActorContext.getReplicatedLog().removeFrom(0);
1259
1260         // follower too has the exact same log entries and has the same commit index
1261         followerActorContext.setReplicatedLog(
1262                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1263
1264         followerActorContext.setCommitIndex(1);
1265
1266         leader = new Leader(leaderActorContext);
1267         leaderActorContext.setCurrentBehavior(leader);
1268
1269         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1270
1271         assertEquals(-1, appendEntries.getLeaderCommit());
1272         assertEquals(0, appendEntries.getEntries().size());
1273         assertEquals(0, appendEntries.getPrevLogIndex());
1274
1275         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1276                 leaderActor, AppendEntriesReply.class);
1277
1278         assertEquals(2, appendEntriesReply.getLogLastIndex());
1279         assertEquals(1, appendEntriesReply.getLogLastTerm());
1280
1281         // follower returns its next index
1282         assertEquals(2, appendEntriesReply.getLogLastIndex());
1283         assertEquals(1, appendEntriesReply.getLogLastTerm());
1284
1285         follower.close();
1286     }
1287
1288     @Test
1289     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1290         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1291
1292         final MockRaftActorContext leaderActorContext = createActorContext();
1293
1294         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1295         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1296
1297         Follower follower = new Follower(followerActorContext);
1298         followerActor.underlyingActor().setBehavior(follower);
1299         followerActorContext.setCurrentBehavior(follower);
1300
1301         Map<String, String> leaderPeerAddresses = new HashMap<>();
1302         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1303
1304         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1305
1306         leaderActorContext.getReplicatedLog().removeFrom(0);
1307
1308         leaderActorContext.setReplicatedLog(
1309                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1310
1311         leaderActorContext.setCommitIndex(1);
1312
1313         followerActorContext.getReplicatedLog().removeFrom(0);
1314
1315         followerActorContext.setReplicatedLog(
1316                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1317
1318         // follower has the same log entries but its commit index > leaders commit index
1319         followerActorContext.setCommitIndex(2);
1320
1321         leader = new Leader(leaderActorContext);
1322
1323         // Initial heartbeat
1324         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1325
1326         assertEquals(-1, appendEntries.getLeaderCommit());
1327         assertEquals(0, appendEntries.getEntries().size());
1328         assertEquals(0, appendEntries.getPrevLogIndex());
1329
1330         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1331                 leaderActor, AppendEntriesReply.class);
1332
1333         assertEquals(2, appendEntriesReply.getLogLastIndex());
1334         assertEquals(1, appendEntriesReply.getLogLastTerm());
1335
1336         leaderActor.underlyingActor().setBehavior(follower);
1337         leader.handleMessage(followerActor, appendEntriesReply);
1338
1339         leaderActor.underlyingActor().clear();
1340         followerActor.underlyingActor().clear();
1341
1342         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1343                 TimeUnit.MILLISECONDS);
1344
1345         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1346
1347         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1348
1349         assertEquals(2, appendEntries.getLeaderCommit());
1350         assertEquals(0, appendEntries.getEntries().size());
1351         assertEquals(2, appendEntries.getPrevLogIndex());
1352
1353         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1354
1355         assertEquals(2, appendEntriesReply.getLogLastIndex());
1356         assertEquals(1, appendEntriesReply.getLogLastTerm());
1357
1358         assertEquals(2, followerActorContext.getCommitIndex());
1359
1360         follower.close();
1361     }
1362
1363     @Test
1364     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1365         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1366
1367         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1368         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1369                 new FiniteDuration(1000, TimeUnit.SECONDS));
1370
1371         leaderActorContext.setReplicatedLog(
1372                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1373         long leaderCommitIndex = 2;
1374         leaderActorContext.setCommitIndex(leaderCommitIndex);
1375         leaderActorContext.setLastApplied(leaderCommitIndex);
1376
1377         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1378         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1379
1380         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1381
1382         followerActorContext.setReplicatedLog(
1383                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1384         followerActorContext.setCommitIndex(0);
1385         followerActorContext.setLastApplied(0);
1386
1387         Follower follower = new Follower(followerActorContext);
1388         followerActor.underlyingActor().setBehavior(follower);
1389
1390         leader = new Leader(leaderActorContext);
1391
1392         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1393         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1394                 AppendEntriesReply.class);
1395
1396         MessageCollectorActor.clearMessages(followerActor);
1397         MessageCollectorActor.clearMessages(leaderActor);
1398
1399         // Verify initial AppendEntries sent.
1400         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1401         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1402         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1403
1404         leaderActor.underlyingActor().setBehavior(leader);
1405
1406         leader.handleMessage(followerActor, appendEntriesReply);
1407
1408         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1409         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1410
1411         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1412         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1413         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1414
1415         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1416         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1417                 appendEntries.getEntries().get(0).getData());
1418         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1419         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1420                 appendEntries.getEntries().get(1).getData());
1421
1422         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1423         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1424
1425         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1426
1427         ApplyState applyState = applyStateList.get(0);
1428         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1429         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1430         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1431                 applyState.getReplicatedLogEntry().getData());
1432
1433         applyState = applyStateList.get(1);
1434         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1435         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1436         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1437                 applyState.getReplicatedLogEntry().getData());
1438
1439         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1440         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1441     }
1442
1443     @Test
1444     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1445         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1446
1447         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1448         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1449                 new FiniteDuration(1000, TimeUnit.SECONDS));
1450
1451         leaderActorContext.setReplicatedLog(
1452                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1453         long leaderCommitIndex = 1;
1454         leaderActorContext.setCommitIndex(leaderCommitIndex);
1455         leaderActorContext.setLastApplied(leaderCommitIndex);
1456
1457         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1458         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1459
1460         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1461
1462         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1463         followerActorContext.setCommitIndex(-1);
1464         followerActorContext.setLastApplied(-1);
1465
1466         Follower follower = new Follower(followerActorContext);
1467         followerActor.underlyingActor().setBehavior(follower);
1468         followerActorContext.setCurrentBehavior(follower);
1469
1470         leader = new Leader(leaderActorContext);
1471
1472         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1473         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1474                 AppendEntriesReply.class);
1475
1476         MessageCollectorActor.clearMessages(followerActor);
1477         MessageCollectorActor.clearMessages(leaderActor);
1478
1479         // Verify initial AppendEntries sent with the leader's current commit index.
1480         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1481         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1482         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1483
1484         leaderActor.underlyingActor().setBehavior(leader);
1485         leaderActorContext.setCurrentBehavior(leader);
1486
1487         leader.handleMessage(followerActor, appendEntriesReply);
1488
1489         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1490         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1491
1492         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1493         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1494         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1495
1496         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1497         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1498                 appendEntries.getEntries().get(0).getData());
1499         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1500         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1501                 appendEntries.getEntries().get(1).getData());
1502
1503         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1504         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1505
1506         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1507
1508         ApplyState applyState = applyStateList.get(0);
1509         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1510         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1511         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1512                 applyState.getReplicatedLogEntry().getData());
1513
1514         applyState = applyStateList.get(1);
1515         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1516         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1517         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1518                 applyState.getReplicatedLogEntry().getData());
1519
1520         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1521         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1522     }
1523
1524     @Test
1525     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1526         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1527
1528         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1529         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1530                 new FiniteDuration(1000, TimeUnit.SECONDS));
1531
1532         leaderActorContext.setReplicatedLog(
1533                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1534         long leaderCommitIndex = 1;
1535         leaderActorContext.setCommitIndex(leaderCommitIndex);
1536         leaderActorContext.setLastApplied(leaderCommitIndex);
1537
1538         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1539         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1540
1541         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1542
1543         followerActorContext.setReplicatedLog(
1544                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1545         followerActorContext.setCommitIndex(-1);
1546         followerActorContext.setLastApplied(-1);
1547
1548         Follower follower = new Follower(followerActorContext);
1549         followerActor.underlyingActor().setBehavior(follower);
1550         followerActorContext.setCurrentBehavior(follower);
1551
1552         leader = new Leader(leaderActorContext);
1553
1554         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1555         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1556                 AppendEntriesReply.class);
1557
1558         MessageCollectorActor.clearMessages(followerActor);
1559         MessageCollectorActor.clearMessages(leaderActor);
1560
1561         // Verify initial AppendEntries sent with the leader's current commit index.
1562         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1563         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1564         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1565
1566         leaderActor.underlyingActor().setBehavior(leader);
1567         leaderActorContext.setCurrentBehavior(leader);
1568
1569         leader.handleMessage(followerActor, appendEntriesReply);
1570
1571         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1572         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1573
1574         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1575         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1576         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1577
1578         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1579         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1580         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1581                 appendEntries.getEntries().get(0).getData());
1582         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1583         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1584         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1585                 appendEntries.getEntries().get(1).getData());
1586
1587         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1588         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1589
1590         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1591
1592         ApplyState applyState = applyStateList.get(0);
1593         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1594         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1595         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1596                 applyState.getReplicatedLogEntry().getData());
1597
1598         applyState = applyStateList.get(1);
1599         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1600         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1601         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1602                 applyState.getReplicatedLogEntry().getData());
1603
1604         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1605         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1606         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1607     }
1608
1609     @Test
1610     public void testHandleAppendEntriesReplyWithNewerTerm() {
1611         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1612
1613         MockRaftActorContext leaderActorContext = createActorContext();
1614         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1615                 new FiniteDuration(10000, TimeUnit.SECONDS));
1616
1617         leaderActorContext.setReplicatedLog(
1618                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1619
1620         leader = new Leader(leaderActorContext);
1621         leaderActor.underlyingActor().setBehavior(leader);
1622         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1623
1624         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1625                 AppendEntriesReply.class);
1626
1627         assertEquals(false, appendEntriesReply.isSuccess());
1628         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1629
1630         MessageCollectorActor.clearMessages(leaderActor);
1631     }
1632
1633     @Test
1634     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1635         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1636
1637         MockRaftActorContext leaderActorContext = createActorContext();
1638         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1639                 new FiniteDuration(10000, TimeUnit.SECONDS));
1640
1641         leaderActorContext.setReplicatedLog(
1642                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1643         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1644
1645         leader = new Leader(leaderActorContext);
1646         leaderActor.underlyingActor().setBehavior(leader);
1647         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1648
1649         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1650                 AppendEntriesReply.class);
1651
1652         assertEquals(false, appendEntriesReply.isSuccess());
1653         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1654
1655         MessageCollectorActor.clearMessages(leaderActor);
1656     }
1657
1658     @Test
1659     public void testHandleAppendEntriesReplySuccess() throws Exception {
1660         logStart("testHandleAppendEntriesReplySuccess");
1661
1662         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1663
1664         leaderActorContext.setReplicatedLog(
1665                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1666
1667         leaderActorContext.setCommitIndex(1);
1668         leaderActorContext.setLastApplied(1);
1669         leaderActorContext.getTermInformation().update(1, "leader");
1670
1671         leader = new Leader(leaderActorContext);
1672
1673         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1674
1675         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1676         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1677
1678         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1679
1680         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1681
1682         assertEquals(RaftState.Leader, raftActorBehavior.state());
1683
1684         assertEquals(2, leaderActorContext.getCommitIndex());
1685
1686         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1687                 leaderActor, ApplyJournalEntries.class);
1688
1689         assertEquals(2, leaderActorContext.getLastApplied());
1690
1691         assertEquals(2, applyJournalEntries.getToIndex());
1692
1693         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1694                 ApplyState.class);
1695
1696         assertEquals(1,applyStateList.size());
1697
1698         ApplyState applyState = applyStateList.get(0);
1699
1700         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1701
1702         assertEquals(2, followerInfo.getMatchIndex());
1703         assertEquals(3, followerInfo.getNextIndex());
1704         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1705         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1706     }
1707
1708     @Test
1709     public void testHandleAppendEntriesReplyUnknownFollower() {
1710         logStart("testHandleAppendEntriesReplyUnknownFollower");
1711
1712         MockRaftActorContext leaderActorContext = createActorContext();
1713
1714         leader = new Leader(leaderActorContext);
1715
1716         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1717
1718         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1719
1720         assertEquals(RaftState.Leader, raftActorBehavior.state());
1721     }
1722
1723     @Test
1724     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1725         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1726
1727         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1728         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1729                 new FiniteDuration(1000, TimeUnit.SECONDS));
1730         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1731
1732         leaderActorContext.setReplicatedLog(
1733                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1734         long leaderCommitIndex = 3;
1735         leaderActorContext.setCommitIndex(leaderCommitIndex);
1736         leaderActorContext.setLastApplied(leaderCommitIndex);
1737
1738         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1739         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1740         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1741         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1742
1743         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1744
1745         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1746         followerActorContext.setCommitIndex(-1);
1747         followerActorContext.setLastApplied(-1);
1748
1749         Follower follower = new Follower(followerActorContext);
1750         followerActor.underlyingActor().setBehavior(follower);
1751         followerActorContext.setCurrentBehavior(follower);
1752
1753         leader = new Leader(leaderActorContext);
1754
1755         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1756         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1757                 AppendEntriesReply.class);
1758
1759         MessageCollectorActor.clearMessages(followerActor);
1760         MessageCollectorActor.clearMessages(leaderActor);
1761
1762         // Verify initial AppendEntries sent with the leader's current commit index.
1763         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1764         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1765         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1766
1767         leaderActor.underlyingActor().setBehavior(leader);
1768         leaderActorContext.setCurrentBehavior(leader);
1769
1770         leader.handleMessage(followerActor, appendEntriesReply);
1771
1772         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1773                 AppendEntries.class, 2);
1774         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1775
1776         appendEntries = appendEntriesList.get(0);
1777         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1778         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1779         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1780
1781         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1782         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1783                 appendEntries.getEntries().get(0).getData());
1784         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1785         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1786                 appendEntries.getEntries().get(1).getData());
1787
1788         appendEntries = appendEntriesList.get(1);
1789         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1790         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1791         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1792
1793         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1794         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1795                 appendEntries.getEntries().get(0).getData());
1796         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1797         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1798                 appendEntries.getEntries().get(1).getData());
1799
1800         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1801         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1802
1803         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1804
1805         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1806         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1807     }
1808
1809     @Test
1810     public void testHandleRequestVoteReply() {
1811         logStart("testHandleRequestVoteReply");
1812
1813         MockRaftActorContext leaderActorContext = createActorContext();
1814
1815         leader = new Leader(leaderActorContext);
1816
1817         // Should be a no-op.
1818         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1819                 new RequestVoteReply(1, true));
1820
1821         assertEquals(RaftState.Leader, raftActorBehavior.state());
1822
1823         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1824
1825         assertEquals(RaftState.Leader, raftActorBehavior.state());
1826     }
1827
1828     @Test
1829     public void testIsolatedLeaderCheckNoFollowers() {
1830         logStart("testIsolatedLeaderCheckNoFollowers");
1831
1832         MockRaftActorContext leaderActorContext = createActorContext();
1833
1834         leader = new Leader(leaderActorContext);
1835         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1836         assertTrue(newBehavior instanceof Leader);
1837     }
1838
1839     @Test
1840     public void testIsolatedLeaderCheckNoVotingFollowers() {
1841         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1842
1843         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1844         Follower follower = new Follower(followerActorContext);
1845         followerActor.underlyingActor().setBehavior(follower);
1846
1847         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1848         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1849                 new FiniteDuration(1000, TimeUnit.SECONDS));
1850         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1851
1852         leader = new Leader(leaderActorContext);
1853         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1854         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1855         assertTrue("Expected Leader", newBehavior instanceof Leader);
1856     }
1857
1858     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
1859         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1860         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1861
1862         MockRaftActorContext leaderActorContext = createActorContext();
1863
1864         Map<String, String> peerAddresses = new HashMap<>();
1865         peerAddresses.put("follower-1", followerActor1.path().toString());
1866         peerAddresses.put("follower-2", followerActor2.path().toString());
1867
1868         leaderActorContext.setPeerAddresses(peerAddresses);
1869         leaderActorContext.setRaftPolicy(raftPolicy);
1870
1871         leader = new Leader(leaderActorContext);
1872
1873         leader.markFollowerActive("follower-1");
1874         leader.markFollowerActive("follower-2");
1875         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1876         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1877
1878         // kill 1 follower and verify if that got killed
1879         final JavaTestKit probe = new JavaTestKit(getSystem());
1880         probe.watch(followerActor1);
1881         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1882         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1883         assertEquals(termMsg1.getActor(), followerActor1);
1884
1885         leader.markFollowerInActive("follower-1");
1886         leader.markFollowerActive("follower-2");
1887         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1888         assertTrue("Behavior not instance of Leader when majority of followers are active",
1889                 newBehavior instanceof Leader);
1890
1891         // kill 2nd follower and leader should change to Isolated leader
1892         followerActor2.tell(PoisonPill.getInstance(), null);
1893         probe.watch(followerActor2);
1894         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1895         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1896         assertEquals(termMsg2.getActor(), followerActor2);
1897
1898         leader.markFollowerInActive("follower-2");
1899         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1900     }
1901
1902     @Test
1903     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1904         logStart("testIsolatedLeaderCheckTwoFollowers");
1905
1906         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1907
1908         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1909             newBehavior instanceof IsolatedLeader);
1910     }
1911
1912     @Test
1913     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1914         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1915
1916         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1917
1918         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1919                 newBehavior instanceof Leader);
1920     }
1921
1922     @Test
1923     public void testLaggingFollowerStarvation() throws Exception {
1924         logStart("testLaggingFollowerStarvation");
1925
1926         String leaderActorId = actorFactory.generateActorId("leader");
1927         String follower1ActorId = actorFactory.generateActorId("follower");
1928         String follower2ActorId = actorFactory.generateActorId("follower");
1929
1930         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1931         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1932
1933         MockRaftActorContext leaderActorContext =
1934                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1935
1936         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1937         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1938         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1939
1940         leaderActorContext.setConfigParams(configParams);
1941
1942         leaderActorContext.setReplicatedLog(
1943                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1944
1945         Map<String, String> peerAddresses = new HashMap<>();
1946         peerAddresses.put(follower1ActorId,
1947                 follower1Actor.path().toString());
1948         peerAddresses.put(follower2ActorId,
1949                 follower2Actor.path().toString());
1950
1951         leaderActorContext.setPeerAddresses(peerAddresses);
1952         leaderActorContext.getTermInformation().update(1, leaderActorId);
1953
1954         leader = createBehavior(leaderActorContext);
1955
1956         leaderActor.underlyingActor().setBehavior(leader);
1957
1958         for (int i = 1; i < 6; i++) {
1959             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1960             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
1961                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1962             assertTrue(newBehavior == leader);
1963             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1964         }
1965
1966         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1967         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1968
1969         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1970                 heartbeats.size() > 1);
1971
1972         // Check if follower-2 got AppendEntries during this time and was not starved
1973         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1974
1975         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1976                 appendEntries.size() > 1);
1977     }
1978
1979     @Test
1980     public void testReplicationConsensusWithNonVotingFollower() {
1981         logStart("testReplicationConsensusWithNonVotingFollower");
1982
1983         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1984         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1985                 new FiniteDuration(1000, TimeUnit.SECONDS));
1986
1987         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1988         leaderActorContext.setCommitIndex(-1);
1989         leaderActorContext.setLastApplied(-1);
1990
1991         String nonVotingFollowerId = "nonvoting-follower";
1992         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1993                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1994
1995         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
1996                 VotingState.NON_VOTING);
1997
1998         leader = new Leader(leaderActorContext);
1999         leaderActorContext.setCurrentBehavior(leader);
2000
2001         // Ignore initial heartbeats
2002         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2003         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2004
2005         MessageCollectorActor.clearMessages(followerActor);
2006         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2007         MessageCollectorActor.clearMessages(leaderActor);
2008
2009         // Send a Replicate message and wait for AppendEntries.
2010         sendReplicate(leaderActorContext, 0);
2011
2012         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2013         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2014
2015         // Send reply only from the voting follower and verify consensus via ApplyState.
2016         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2017
2018         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2019
2020         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2021
2022         MessageCollectorActor.clearMessages(followerActor);
2023         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2024         MessageCollectorActor.clearMessages(leaderActor);
2025
2026         // Send another Replicate message
2027         sendReplicate(leaderActorContext, 1);
2028
2029         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2030         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2031                 AppendEntries.class);
2032         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2033         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2034
2035         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2036         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2037
2038         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2039
2040         // Send reply from the voting follower and verify consensus.
2041         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2042
2043         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2044     }
2045
2046     @Test
2047     public void testTransferLeadershipWithFollowerInSync() {
2048         logStart("testTransferLeadershipWithFollowerInSync");
2049
2050         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2051         leaderActorContext.setLastApplied(-1);
2052         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2053                 new FiniteDuration(1000, TimeUnit.SECONDS));
2054         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2055
2056         leader = new Leader(leaderActorContext);
2057         leaderActorContext.setCurrentBehavior(leader);
2058
2059         // Initial heartbeat
2060         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2061         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2062         MessageCollectorActor.clearMessages(followerActor);
2063
2064         sendReplicate(leaderActorContext, 0);
2065         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2066
2067         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2068         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2069         MessageCollectorActor.clearMessages(followerActor);
2070
2071         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2072         leader.transferLeadership(mockTransferCohort);
2073
2074         verify(mockTransferCohort, never()).transferComplete();
2075         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2076         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2077
2078         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2079         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2080
2081         // Leader should force an election timeout
2082         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2083
2084         verify(mockTransferCohort).transferComplete();
2085     }
2086
2087     @Test
2088     public void testTransferLeadershipWithEmptyLog() {
2089         logStart("testTransferLeadershipWithEmptyLog");
2090
2091         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2092         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2093                 new FiniteDuration(1000, TimeUnit.SECONDS));
2094         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2095
2096         leader = new Leader(leaderActorContext);
2097         leaderActorContext.setCurrentBehavior(leader);
2098
2099         // Initial heartbeat
2100         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2101         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2102         MessageCollectorActor.clearMessages(followerActor);
2103
2104         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2105         leader.transferLeadership(mockTransferCohort);
2106
2107         verify(mockTransferCohort, never()).transferComplete();
2108         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2109         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2110
2111         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2112         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2113
2114         // Leader should force an election timeout
2115         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2116
2117         verify(mockTransferCohort).transferComplete();
2118     }
2119
2120     @Test
2121     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2122         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2123
2124         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2125         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2126                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2127
2128         leader = new Leader(leaderActorContext);
2129         leaderActorContext.setCurrentBehavior(leader);
2130
2131         // Initial heartbeat
2132         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2133         MessageCollectorActor.clearMessages(followerActor);
2134
2135         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2136         leader.transferLeadership(mockTransferCohort);
2137
2138         verify(mockTransferCohort, never()).transferComplete();
2139
2140         // Sync up the follower.
2141         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2142         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2143         MessageCollectorActor.clearMessages(followerActor);
2144
2145         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2146                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2147         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2148         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2149         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2150
2151         // Leader should force an election timeout
2152         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2153
2154         verify(mockTransferCohort).transferComplete();
2155     }
2156
2157     @Test
2158     public void testTransferLeadershipWithFollowerSyncTimeout() {
2159         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2160
2161         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2162         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2163                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2164         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2165         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2166
2167         leader = new Leader(leaderActorContext);
2168         leaderActorContext.setCurrentBehavior(leader);
2169
2170         // Initial heartbeat
2171         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2172         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2173         MessageCollectorActor.clearMessages(followerActor);
2174
2175         sendReplicate(leaderActorContext, 0);
2176         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2177
2178         MessageCollectorActor.clearMessages(followerActor);
2179
2180         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2181         leader.transferLeadership(mockTransferCohort);
2182
2183         verify(mockTransferCohort, never()).transferComplete();
2184
2185         // Send heartbeats to time out the transfer.
2186         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2187             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2188                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2189             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2190         }
2191
2192         verify(mockTransferCohort).abortTransfer();
2193         verify(mockTransferCohort, never()).transferComplete();
2194         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2195     }
2196
2197     @Override
2198     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
2199             ActorRef actorRef, RaftRPC rpc) throws Exception {
2200         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2201         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2202     }
2203
2204     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2205
2206         private final long electionTimeOutIntervalMillis;
2207         private final int snapshotChunkSize;
2208
2209         MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2210             super();
2211             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2212             this.snapshotChunkSize = snapshotChunkSize;
2213         }
2214
2215         @Override
2216         public FiniteDuration getElectionTimeOutInterval() {
2217             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2218         }
2219
2220         @Override
2221         public int getSnapshotChunkSize() {
2222             return snapshotChunkSize;
2223         }
2224     }
2225 }