Add ShutDown message to RaftActor to transfer leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.never;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.google.protobuf.ByteString;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
35 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
36 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
37 import org.opendaylight.controller.cluster.raft.RaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
39 import org.opendaylight.controller.cluster.raft.RaftState;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
42 import org.opendaylight.controller.cluster.raft.SerializationUtils;
43 import org.opendaylight.controller.cluster.raft.Snapshot;
44 import org.opendaylight.controller.cluster.raft.VotingState;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
49 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
50 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
51 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
53 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
60 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
61 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
62 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
63 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
64 import scala.concurrent.duration.FiniteDuration;
65
66 public class LeaderTest extends AbstractLeaderTest {
67
68     static final String FOLLOWER_ID = "follower";
69     public static final String LEADER_ID = "leader";
70
71     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
72             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
73
74     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
75             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
76
77     private Leader leader;
78     private final short payloadVersion = 5;
79
80     @Override
81     @After
82     public void tearDown() throws Exception {
83         if(leader != null) {
84             leader.close();
85         }
86
87         super.tearDown();
88     }
89
90     @Test
91     public void testHandleMessageForUnknownMessage() throws Exception {
92         logStart("testHandleMessageForUnknownMessage");
93
94         leader = new Leader(createActorContext());
95
96         // handle message should return the Leader state when it receives an
97         // unknown message
98         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
99         Assert.assertTrue(behavior instanceof Leader);
100     }
101
102     @Test
103     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
104         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
105
106         MockRaftActorContext actorContext = createActorContextWithFollower();
107         short payloadVersion = (short)5;
108         actorContext.setPayloadVersion(payloadVersion);
109
110         long term = 1;
111         actorContext.getTermInformation().update(term, "");
112
113         leader = new Leader(actorContext);
114
115         // Leader should send an immediate heartbeat with no entries as follower is inactive.
116         long lastIndex = actorContext.getReplicatedLog().lastIndex();
117         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
118         assertEquals("getTerm", term, appendEntries.getTerm());
119         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
120         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
121         assertEquals("Entries size", 0, appendEntries.getEntries().size());
122         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
123
124         // The follower would normally reply - simulate that explicitly here.
125         leader.handleMessage(followerActor, new AppendEntriesReply(
126                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
127         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
128
129         followerActor.underlyingActor().clear();
130
131         // Sleep for the heartbeat interval so AppendEntries is sent.
132         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
133                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
134
135         leader.handleMessage(leaderActor, new SendHeartBeat());
136
137         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
138         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
139         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
140         assertEquals("Entries size", 1, appendEntries.getEntries().size());
141         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
142         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
143         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
144     }
145
146
147     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
148         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
149         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
150                 1, index, payload);
151         actorContext.getReplicatedLog().append(newEntry);
152         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
153     }
154
155     @Test
156     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
157         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
158
159         MockRaftActorContext actorContext = createActorContextWithFollower();
160
161         long term = 1;
162         actorContext.getTermInformation().update(term, "");
163
164         leader = new Leader(actorContext);
165
166         // Leader will send an immediate heartbeat - ignore it.
167         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
168
169         // The follower would normally reply - simulate that explicitly here.
170         long lastIndex = actorContext.getReplicatedLog().lastIndex();
171         leader.handleMessage(followerActor, new AppendEntriesReply(
172                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
173         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
174
175         followerActor.underlyingActor().clear();
176
177         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
178
179         // State should not change
180         assertTrue(raftBehavior instanceof Leader);
181
182         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
183         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
184         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
185         assertEquals("Entries size", 1, appendEntries.getEntries().size());
186         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
187         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
188         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
189         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
190     }
191
192     @Test
193     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
194         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
195
196         MockRaftActorContext actorContext = createActorContextWithFollower();
197         actorContext.setRaftPolicy(createRaftPolicy(true, true));
198
199         long term = 1;
200         actorContext.getTermInformation().update(term, "");
201
202         leader = new Leader(actorContext);
203
204         // Leader will send an immediate heartbeat - ignore it.
205         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
206
207         // The follower would normally reply - simulate that explicitly here.
208         long lastIndex = actorContext.getReplicatedLog().lastIndex();
209         leader.handleMessage(followerActor, new AppendEntriesReply(
210                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
211         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
212
213         followerActor.underlyingActor().clear();
214
215         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
216
217         // State should not change
218         assertTrue(raftBehavior instanceof Leader);
219
220         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
221         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
222         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
223         assertEquals("Entries size", 1, appendEntries.getEntries().size());
224         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
225         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
226         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
227         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
228     }
229
230     @Test
231     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
232         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
233
234         MockRaftActorContext actorContext = createActorContextWithFollower();
235         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
236             @Override
237             public FiniteDuration getHeartBeatInterval() {
238                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
239             }
240         });
241
242         long term = 1;
243         actorContext.getTermInformation().update(term, "");
244
245         leader = new Leader(actorContext);
246
247         // Leader will send an immediate heartbeat - ignore it.
248         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
249
250         // The follower would normally reply - simulate that explicitly here.
251         long lastIndex = actorContext.getReplicatedLog().lastIndex();
252         leader.handleMessage(followerActor, new AppendEntriesReply(
253                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
254         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
255
256         followerActor.underlyingActor().clear();
257
258         for(int i=0;i<5;i++) {
259             sendReplicate(actorContext, lastIndex+i+1);
260         }
261
262         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
263         // We expect only 1 message to be sent because of two reasons,
264         // - an append entries reply was not received
265         // - the heartbeat interval has not expired
266         // In this scenario if multiple messages are sent they would likely be duplicates
267         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
268     }
269
270     @Test
271     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
272         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
273
274         MockRaftActorContext actorContext = createActorContextWithFollower();
275         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
276             @Override
277             public FiniteDuration getHeartBeatInterval() {
278                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
279             }
280         });
281
282         long term = 1;
283         actorContext.getTermInformation().update(term, "");
284
285         leader = new Leader(actorContext);
286
287         // Leader will send an immediate heartbeat - ignore it.
288         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
289
290         // The follower would normally reply - simulate that explicitly here.
291         long lastIndex = actorContext.getReplicatedLog().lastIndex();
292         leader.handleMessage(followerActor, new AppendEntriesReply(
293                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
294         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
295
296         followerActor.underlyingActor().clear();
297
298         for(int i=0;i<3;i++) {
299             sendReplicate(actorContext, lastIndex+i+1);
300             leader.handleMessage(followerActor, new AppendEntriesReply(
301                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
302
303         }
304
305         for(int i=3;i<5;i++) {
306             sendReplicate(actorContext, lastIndex + i + 1);
307         }
308
309         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
310         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
311         // get sent to the follower - but not the 5th
312         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
313
314         for(int i=0;i<4;i++) {
315             long expected = allMessages.get(i).getEntries().get(0).getIndex();
316             assertEquals(expected, i+2);
317         }
318     }
319
320     @Test
321     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
322         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
323
324         MockRaftActorContext actorContext = createActorContextWithFollower();
325         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
326             @Override
327             public FiniteDuration getHeartBeatInterval() {
328                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
329             }
330         });
331
332         long term = 1;
333         actorContext.getTermInformation().update(term, "");
334
335         leader = new Leader(actorContext);
336
337         // Leader will send an immediate heartbeat - ignore it.
338         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
339
340         // The follower would normally reply - simulate that explicitly here.
341         long lastIndex = actorContext.getReplicatedLog().lastIndex();
342         leader.handleMessage(followerActor, new AppendEntriesReply(
343                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
344         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
345
346         followerActor.underlyingActor().clear();
347
348         sendReplicate(actorContext, lastIndex+1);
349
350         // Wait slightly longer than heartbeat duration
351         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
352
353         leader.handleMessage(leaderActor, new SendHeartBeat());
354
355         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
356         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
357
358         assertEquals(1, allMessages.get(0).getEntries().size());
359         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
360         assertEquals(1, allMessages.get(1).getEntries().size());
361         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
362
363     }
364
365     @Test
366     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
367         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
368
369         MockRaftActorContext actorContext = createActorContextWithFollower();
370         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
371             @Override
372             public FiniteDuration getHeartBeatInterval() {
373                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
374             }
375         });
376
377         long term = 1;
378         actorContext.getTermInformation().update(term, "");
379
380         leader = new Leader(actorContext);
381
382         // Leader will send an immediate heartbeat - ignore it.
383         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
384
385         // The follower would normally reply - simulate that explicitly here.
386         long lastIndex = actorContext.getReplicatedLog().lastIndex();
387         leader.handleMessage(followerActor, new AppendEntriesReply(
388                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
389         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
390
391         followerActor.underlyingActor().clear();
392
393         for(int i=0;i<3;i++) {
394             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
395             leader.handleMessage(leaderActor, new SendHeartBeat());
396         }
397
398         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
399         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
400     }
401
402     @Test
403     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
404         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
405
406         MockRaftActorContext actorContext = createActorContextWithFollower();
407         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
408             @Override
409             public FiniteDuration getHeartBeatInterval() {
410                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
411             }
412         });
413
414         long term = 1;
415         actorContext.getTermInformation().update(term, "");
416
417         leader = new Leader(actorContext);
418
419         // Leader will send an immediate heartbeat - ignore it.
420         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
421
422         // The follower would normally reply - simulate that explicitly here.
423         long lastIndex = actorContext.getReplicatedLog().lastIndex();
424         leader.handleMessage(followerActor, new AppendEntriesReply(
425                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
426         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
427
428         followerActor.underlyingActor().clear();
429
430         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
431         leader.handleMessage(leaderActor, new SendHeartBeat());
432         sendReplicate(actorContext, lastIndex+1);
433
434         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
435         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
436
437         assertEquals(0, allMessages.get(0).getEntries().size());
438         assertEquals(1, allMessages.get(1).getEntries().size());
439     }
440
441
442     @Test
443     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
444         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
445
446         MockRaftActorContext actorContext = createActorContext();
447
448         leader = new Leader(actorContext);
449
450         actorContext.setLastApplied(0);
451
452         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
453         long term = actorContext.getTermInformation().getCurrentTerm();
454         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
455                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
456
457         actorContext.getReplicatedLog().append(newEntry);
458
459         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
460                 new Replicate(leaderActor, "state-id", newEntry));
461
462         // State should not change
463         assertTrue(raftBehavior instanceof Leader);
464
465         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
466
467         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
468         // one since lastApplied state is 0.
469         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
470                 leaderActor, ApplyState.class);
471         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
472
473         for(int i = 0; i <= newLogIndex - 1; i++ ) {
474             ApplyState applyState = applyStateList.get(i);
475             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
476             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
477         }
478
479         ApplyState last = applyStateList.get((int) newLogIndex - 1);
480         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
481         assertEquals("getIdentifier", "state-id", last.getIdentifier());
482     }
483
484     @Test
485     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
486         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
487
488         MockRaftActorContext actorContext = createActorContextWithFollower();
489
490         Map<String, String> leadersSnapshot = new HashMap<>();
491         leadersSnapshot.put("1", "A");
492         leadersSnapshot.put("2", "B");
493         leadersSnapshot.put("3", "C");
494
495         //clears leaders log
496         actorContext.getReplicatedLog().removeFrom(0);
497
498         final int commitIndex = 3;
499         final int snapshotIndex = 2;
500         final int newEntryIndex = 4;
501         final int snapshotTerm = 1;
502         final int currentTerm = 2;
503
504         // set the snapshot variables in replicatedlog
505         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
506         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
507         actorContext.setCommitIndex(commitIndex);
508         //set follower timeout to 2 mins, helps during debugging
509         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
510
511         leader = new Leader(actorContext);
512
513         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
514         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
515
516         // new entry
517         ReplicatedLogImplEntry entry =
518                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
519                         new MockRaftActorContext.MockPayload("D"));
520
521         //update follower timestamp
522         leader.markFollowerActive(FOLLOWER_ID);
523
524         ByteString bs = toByteString(leadersSnapshot);
525         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
526                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
527         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
528         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
529
530         //send first chunk and no InstallSnapshotReply received yet
531         fts.getNextChunk();
532         fts.incrementChunkIndex();
533
534         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
535                 TimeUnit.MILLISECONDS);
536
537         leader.handleMessage(leaderActor, new SendHeartBeat());
538
539         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
540
541         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
542
543         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
544
545         //InstallSnapshotReply received
546         fts.markSendStatus(true);
547
548         leader.handleMessage(leaderActor, new SendHeartBeat());
549
550         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
551
552         assertEquals(commitIndex, is.getLastIncludedIndex());
553     }
554
555     @Test
556     public void testSendAppendEntriesSnapshotScenario() throws Exception {
557         logStart("testSendAppendEntriesSnapshotScenario");
558
559         MockRaftActorContext actorContext = createActorContextWithFollower();
560
561         Map<String, String> leadersSnapshot = new HashMap<>();
562         leadersSnapshot.put("1", "A");
563         leadersSnapshot.put("2", "B");
564         leadersSnapshot.put("3", "C");
565
566         //clears leaders log
567         actorContext.getReplicatedLog().removeFrom(0);
568
569         final int followersLastIndex = 2;
570         final int snapshotIndex = 3;
571         final int newEntryIndex = 4;
572         final int snapshotTerm = 1;
573         final int currentTerm = 2;
574
575         // set the snapshot variables in replicatedlog
576         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
577         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
578         actorContext.setCommitIndex(followersLastIndex);
579
580         leader = new Leader(actorContext);
581
582         // Leader will send an immediate heartbeat - ignore it.
583         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
584
585         // new entry
586         ReplicatedLogImplEntry entry =
587                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
588                         new MockRaftActorContext.MockPayload("D"));
589
590         actorContext.getReplicatedLog().append(entry);
591
592         //update follower timestamp
593         leader.markFollowerActive(FOLLOWER_ID);
594
595         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
596         RaftActorBehavior raftBehavior = leader.handleMessage(
597                 leaderActor, new Replicate(null, "state-id", entry));
598
599         assertTrue(raftBehavior instanceof Leader);
600
601         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
602     }
603
604     @Test
605     public void testInitiateInstallSnapshot() throws Exception {
606         logStart("testInitiateInstallSnapshot");
607
608         MockRaftActorContext actorContext = createActorContextWithFollower();
609
610         //clears leaders log
611         actorContext.getReplicatedLog().removeFrom(0);
612
613         final int followersLastIndex = 2;
614         final int snapshotIndex = 3;
615         final int newEntryIndex = 4;
616         final int snapshotTerm = 1;
617         final int currentTerm = 2;
618
619         // set the snapshot variables in replicatedlog
620         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
621         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
622         actorContext.setLastApplied(3);
623         actorContext.setCommitIndex(followersLastIndex);
624
625         leader = new Leader(actorContext);
626
627         // Leader will send an immediate heartbeat - ignore it.
628         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
629
630         // set the snapshot as absent and check if capture-snapshot is invoked.
631         leader.setSnapshot(null);
632
633         // new entry
634         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
635                 new MockRaftActorContext.MockPayload("D"));
636
637         actorContext.getReplicatedLog().append(entry);
638
639         //update follower timestamp
640         leader.markFollowerActive(FOLLOWER_ID);
641
642         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
643
644         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
645
646         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
647
648         assertTrue(cs.isInstallSnapshotInitiated());
649         assertEquals(3, cs.getLastAppliedIndex());
650         assertEquals(1, cs.getLastAppliedTerm());
651         assertEquals(4, cs.getLastIndex());
652         assertEquals(2, cs.getLastTerm());
653
654         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
655         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
656
657         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
658     }
659
660     @Test
661     public void testInitiateForceInstallSnapshot() throws Exception {
662         logStart("testInitiateForceInstallSnapshot");
663
664         MockRaftActorContext actorContext = createActorContextWithFollower();
665
666         final int followersLastIndex = 2;
667         final int snapshotIndex = -1;
668         final int newEntryIndex = 4;
669         final int snapshotTerm = -1;
670         final int currentTerm = 2;
671
672         // set the snapshot variables in replicatedlog
673         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
674         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
675         actorContext.setLastApplied(3);
676         actorContext.setCommitIndex(followersLastIndex);
677
678         actorContext.getReplicatedLog().removeFrom(0);
679
680         leader = new Leader(actorContext);
681
682         // Leader will send an immediate heartbeat - ignore it.
683         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
684
685         // set the snapshot as absent and check if capture-snapshot is invoked.
686         leader.setSnapshot(null);
687
688         for(int i=0;i<4;i++) {
689             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
690                     new MockRaftActorContext.MockPayload("X" + i)));
691         }
692
693         // new entry
694         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
695                 new MockRaftActorContext.MockPayload("D"));
696
697         actorContext.getReplicatedLog().append(entry);
698
699         //update follower timestamp
700         leader.markFollowerActive(FOLLOWER_ID);
701
702         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
703         // installed with a SendInstallSnapshot
704         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
705
706         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
707
708         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
709
710         assertTrue(cs.isInstallSnapshotInitiated());
711         assertEquals(3, cs.getLastAppliedIndex());
712         assertEquals(1, cs.getLastAppliedTerm());
713         assertEquals(4, cs.getLastIndex());
714         assertEquals(2, cs.getLastTerm());
715
716         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
717         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
718
719         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
720     }
721
722
723     @Test
724     public void testInstallSnapshot() throws Exception {
725         logStart("testInstallSnapshot");
726
727         MockRaftActorContext actorContext = createActorContextWithFollower();
728
729         Map<String, String> leadersSnapshot = new HashMap<>();
730         leadersSnapshot.put("1", "A");
731         leadersSnapshot.put("2", "B");
732         leadersSnapshot.put("3", "C");
733
734         //clears leaders log
735         actorContext.getReplicatedLog().removeFrom(0);
736
737         final int lastAppliedIndex = 3;
738         final int snapshotIndex = 2;
739         final int snapshotTerm = 1;
740         final int currentTerm = 2;
741
742         // set the snapshot variables in replicatedlog
743         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
744         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
745         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
746         actorContext.setCommitIndex(lastAppliedIndex);
747         actorContext.setLastApplied(lastAppliedIndex);
748
749         leader = new Leader(actorContext);
750
751         // Initial heartbeat.
752         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
753
754         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
755         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
756
757         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
758                 Collections.<ReplicatedLogEntry>emptyList(),
759                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
760
761         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
762
763         assertTrue(raftBehavior instanceof Leader);
764
765         // check if installsnapshot gets called with the correct values.
766
767         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
768
769         assertNotNull(installSnapshot.getData());
770         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
771         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
772
773         assertEquals(currentTerm, installSnapshot.getTerm());
774     }
775
776     @Test
777     public void testForceInstallSnapshot() throws Exception {
778         logStart("testForceInstallSnapshot");
779
780         MockRaftActorContext actorContext = createActorContextWithFollower();
781
782         Map<String, String> leadersSnapshot = new HashMap<>();
783         leadersSnapshot.put("1", "A");
784         leadersSnapshot.put("2", "B");
785         leadersSnapshot.put("3", "C");
786
787         final int lastAppliedIndex = 3;
788         final int snapshotIndex = -1;
789         final int snapshotTerm = -1;
790         final int currentTerm = 2;
791
792         // set the snapshot variables in replicatedlog
793         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
794         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
795         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
796         actorContext.setCommitIndex(lastAppliedIndex);
797         actorContext.setLastApplied(lastAppliedIndex);
798
799         leader = new Leader(actorContext);
800
801         // Initial heartbeat.
802         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
803
804         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
805         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
806
807         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
808                 Collections.<ReplicatedLogEntry>emptyList(),
809                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
810
811         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
812
813         assertTrue(raftBehavior instanceof Leader);
814
815         // check if installsnapshot gets called with the correct values.
816
817         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
818
819         assertNotNull(installSnapshot.getData());
820         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
821         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
822
823         assertEquals(currentTerm, installSnapshot.getTerm());
824     }
825
826     @Test
827     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
828         logStart("testHandleInstallSnapshotReplyLastChunk");
829
830         MockRaftActorContext actorContext = createActorContextWithFollower();
831
832         final int commitIndex = 3;
833         final int snapshotIndex = 2;
834         final int snapshotTerm = 1;
835         final int currentTerm = 2;
836
837         actorContext.setCommitIndex(commitIndex);
838
839         leader = new Leader(actorContext);
840
841         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
842         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
843
844         // Ignore initial heartbeat.
845         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
846
847         Map<String, String> leadersSnapshot = new HashMap<>();
848         leadersSnapshot.put("1", "A");
849         leadersSnapshot.put("2", "B");
850         leadersSnapshot.put("3", "C");
851
852         // set the snapshot variables in replicatedlog
853
854         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
855         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
856         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
857
858         ByteString bs = toByteString(leadersSnapshot);
859         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
860                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
861         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
862         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
863         while(!fts.isLastChunk(fts.getChunkIndex())) {
864             fts.getNextChunk();
865             fts.incrementChunkIndex();
866         }
867
868         //clears leaders log
869         actorContext.getReplicatedLog().removeFrom(0);
870
871         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
872                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
873
874         assertTrue(raftBehavior instanceof Leader);
875
876         assertEquals(0, leader.followerSnapshotSize());
877         assertEquals(1, leader.followerLogSize());
878         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
879         assertNotNull(fli);
880         assertEquals(commitIndex, fli.getMatchIndex());
881         assertEquals(commitIndex + 1, fli.getNextIndex());
882     }
883
884     @Test
885     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
886         logStart("testSendSnapshotfromInstallSnapshotReply");
887
888         MockRaftActorContext actorContext = createActorContextWithFollower();
889
890         final int commitIndex = 3;
891         final int snapshotIndex = 2;
892         final int snapshotTerm = 1;
893         final int currentTerm = 2;
894
895         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
896             @Override
897             public int getSnapshotChunkSize() {
898                 return 50;
899             }
900         };
901         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
902         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
903
904         actorContext.setConfigParams(configParams);
905         actorContext.setCommitIndex(commitIndex);
906
907         leader = new Leader(actorContext);
908
909         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
910         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
911
912         Map<String, String> leadersSnapshot = new HashMap<>();
913         leadersSnapshot.put("1", "A");
914         leadersSnapshot.put("2", "B");
915         leadersSnapshot.put("3", "C");
916
917         // set the snapshot variables in replicatedlog
918         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
919         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
920         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
921
922         ByteString bs = toByteString(leadersSnapshot);
923         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
924                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
925         leader.setSnapshot(snapshot);
926
927         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
928
929         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
930
931         assertEquals(1, installSnapshot.getChunkIndex());
932         assertEquals(3, installSnapshot.getTotalChunks());
933
934         followerActor.underlyingActor().clear();
935         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
936                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
937
938         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
939
940         assertEquals(2, installSnapshot.getChunkIndex());
941         assertEquals(3, installSnapshot.getTotalChunks());
942
943         followerActor.underlyingActor().clear();
944         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
945                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
946
947         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
948
949         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
950         followerActor.underlyingActor().clear();
951         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
952                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
953
954         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
955
956         Assert.assertNull(installSnapshot);
957     }
958
959
960     @Test
961     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
962         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
963
964         MockRaftActorContext actorContext = createActorContextWithFollower();
965
966         final int commitIndex = 3;
967         final int snapshotIndex = 2;
968         final int snapshotTerm = 1;
969         final int currentTerm = 2;
970
971         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
972             @Override
973             public int getSnapshotChunkSize() {
974                 return 50;
975             }
976         });
977
978         actorContext.setCommitIndex(commitIndex);
979
980         leader = new Leader(actorContext);
981
982         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
983         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
984
985         Map<String, String> leadersSnapshot = new HashMap<>();
986         leadersSnapshot.put("1", "A");
987         leadersSnapshot.put("2", "B");
988         leadersSnapshot.put("3", "C");
989
990         // set the snapshot variables in replicatedlog
991         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
992         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
993         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
994
995         ByteString bs = toByteString(leadersSnapshot);
996         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
997                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
998         leader.setSnapshot(snapshot);
999
1000         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1001         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1002
1003         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1004
1005         assertEquals(1, installSnapshot.getChunkIndex());
1006         assertEquals(3, installSnapshot.getTotalChunks());
1007
1008         followerActor.underlyingActor().clear();
1009
1010         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1011                 FOLLOWER_ID, -1, false));
1012
1013         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1014                 TimeUnit.MILLISECONDS);
1015
1016         leader.handleMessage(leaderActor, new SendHeartBeat());
1017
1018         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1019
1020         assertEquals(1, installSnapshot.getChunkIndex());
1021         assertEquals(3, installSnapshot.getTotalChunks());
1022     }
1023
1024     @Test
1025     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1026         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1027
1028         MockRaftActorContext actorContext = createActorContextWithFollower();
1029
1030         final int commitIndex = 3;
1031         final int snapshotIndex = 2;
1032         final int snapshotTerm = 1;
1033         final int currentTerm = 2;
1034
1035         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1036             @Override
1037             public int getSnapshotChunkSize() {
1038                 return 50;
1039             }
1040         });
1041
1042         actorContext.setCommitIndex(commitIndex);
1043
1044         leader = new Leader(actorContext);
1045
1046         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1047         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1048
1049         Map<String, String> leadersSnapshot = new HashMap<>();
1050         leadersSnapshot.put("1", "A");
1051         leadersSnapshot.put("2", "B");
1052         leadersSnapshot.put("3", "C");
1053
1054         // set the snapshot variables in replicatedlog
1055         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1056         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1057         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1058
1059         ByteString bs = toByteString(leadersSnapshot);
1060         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1061                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1062         leader.setSnapshot(snapshot);
1063
1064         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1065
1066         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1067
1068         assertEquals(1, installSnapshot.getChunkIndex());
1069         assertEquals(3, installSnapshot.getTotalChunks());
1070         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1071
1072         int hashCode = installSnapshot.getData().hashCode();
1073
1074         followerActor.underlyingActor().clear();
1075
1076         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1077                 FOLLOWER_ID, 1, true));
1078
1079         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1080
1081         assertEquals(2, installSnapshot.getChunkIndex());
1082         assertEquals(3, installSnapshot.getTotalChunks());
1083         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1084     }
1085
1086     @Test
1087     public void testFollowerToSnapshotLogic() {
1088         logStart("testFollowerToSnapshotLogic");
1089
1090         MockRaftActorContext actorContext = createActorContext();
1091
1092         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1093             @Override
1094             public int getSnapshotChunkSize() {
1095                 return 50;
1096             }
1097         });
1098
1099         leader = new Leader(actorContext);
1100
1101         Map<String, String> leadersSnapshot = new HashMap<>();
1102         leadersSnapshot.put("1", "A");
1103         leadersSnapshot.put("2", "B");
1104         leadersSnapshot.put("3", "C");
1105
1106         ByteString bs = toByteString(leadersSnapshot);
1107         byte[] barray = bs.toByteArray();
1108
1109         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1110         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1111
1112         assertEquals(bs.size(), barray.length);
1113
1114         int chunkIndex=0;
1115         for (int i=0; i < barray.length; i = i + 50) {
1116             int j = i + 50;
1117             chunkIndex++;
1118
1119             if (i + 50 > barray.length) {
1120                 j = barray.length;
1121             }
1122
1123             ByteString chunk = fts.getNextChunk();
1124             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1125             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1126
1127             fts.markSendStatus(true);
1128             if (!fts.isLastChunk(chunkIndex)) {
1129                 fts.incrementChunkIndex();
1130             }
1131         }
1132
1133         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1134     }
1135
1136     @Override protected RaftActorBehavior createBehavior(
1137         RaftActorContext actorContext) {
1138         return new Leader(actorContext);
1139     }
1140
1141     @Override
1142     protected MockRaftActorContext createActorContext() {
1143         return createActorContext(leaderActor);
1144     }
1145
1146     @Override
1147     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1148         return createActorContext(LEADER_ID, actorRef);
1149     }
1150
1151     private MockRaftActorContext createActorContextWithFollower() {
1152         MockRaftActorContext actorContext = createActorContext();
1153         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1154                 followerActor.path().toString()).build());
1155         return actorContext;
1156     }
1157
1158     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1159         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1160         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1161         configParams.setElectionTimeoutFactor(100000);
1162         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1163         context.setConfigParams(configParams);
1164         context.setPayloadVersion(payloadVersion);
1165         return context;
1166     }
1167
1168     private MockRaftActorContext createFollowerActorContextWithLeader() {
1169         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1170         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1171         followerConfig.setElectionTimeoutFactor(10000);
1172         followerActorContext.setConfigParams(followerConfig);
1173         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1174         return followerActorContext;
1175     }
1176
1177     @Test
1178     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1179         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1180
1181         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1182
1183         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1184
1185         Follower follower = new Follower(followerActorContext);
1186         followerActor.underlyingActor().setBehavior(follower);
1187
1188         Map<String, String> peerAddresses = new HashMap<>();
1189         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1190
1191         leaderActorContext.setPeerAddresses(peerAddresses);
1192
1193         leaderActorContext.getReplicatedLog().removeFrom(0);
1194
1195         //create 3 entries
1196         leaderActorContext.setReplicatedLog(
1197                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1198
1199         leaderActorContext.setCommitIndex(1);
1200
1201         followerActorContext.getReplicatedLog().removeFrom(0);
1202
1203         // follower too has the exact same log entries and has the same commit index
1204         followerActorContext.setReplicatedLog(
1205                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1206
1207         followerActorContext.setCommitIndex(1);
1208
1209         leader = new Leader(leaderActorContext);
1210
1211         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1212
1213         assertEquals(1, appendEntries.getLeaderCommit());
1214         assertEquals(0, appendEntries.getEntries().size());
1215         assertEquals(0, appendEntries.getPrevLogIndex());
1216
1217         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1218                 leaderActor, AppendEntriesReply.class);
1219
1220         assertEquals(2, appendEntriesReply.getLogLastIndex());
1221         assertEquals(1, appendEntriesReply.getLogLastTerm());
1222
1223         // follower returns its next index
1224         assertEquals(2, appendEntriesReply.getLogLastIndex());
1225         assertEquals(1, appendEntriesReply.getLogLastTerm());
1226
1227         follower.close();
1228     }
1229
1230     @Test
1231     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1232         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1233
1234         MockRaftActorContext leaderActorContext = createActorContext();
1235
1236         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1237         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1238
1239         Follower follower = new Follower(followerActorContext);
1240         followerActor.underlyingActor().setBehavior(follower);
1241
1242         Map<String, String> leaderPeerAddresses = new HashMap<>();
1243         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1244
1245         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1246
1247         leaderActorContext.getReplicatedLog().removeFrom(0);
1248
1249         leaderActorContext.setReplicatedLog(
1250                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1251
1252         leaderActorContext.setCommitIndex(1);
1253
1254         followerActorContext.getReplicatedLog().removeFrom(0);
1255
1256         followerActorContext.setReplicatedLog(
1257                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1258
1259         // follower has the same log entries but its commit index > leaders commit index
1260         followerActorContext.setCommitIndex(2);
1261
1262         leader = new Leader(leaderActorContext);
1263
1264         // Initial heartbeat
1265         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1266
1267         assertEquals(1, appendEntries.getLeaderCommit());
1268         assertEquals(0, appendEntries.getEntries().size());
1269         assertEquals(0, appendEntries.getPrevLogIndex());
1270
1271         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1272                 leaderActor, AppendEntriesReply.class);
1273
1274         assertEquals(2, appendEntriesReply.getLogLastIndex());
1275         assertEquals(1, appendEntriesReply.getLogLastTerm());
1276
1277         leaderActor.underlyingActor().setBehavior(follower);
1278         leader.handleMessage(followerActor, appendEntriesReply);
1279
1280         leaderActor.underlyingActor().clear();
1281         followerActor.underlyingActor().clear();
1282
1283         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1284                 TimeUnit.MILLISECONDS);
1285
1286         leader.handleMessage(leaderActor, new SendHeartBeat());
1287
1288         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1289
1290         assertEquals(2, appendEntries.getLeaderCommit());
1291         assertEquals(0, appendEntries.getEntries().size());
1292         assertEquals(2, appendEntries.getPrevLogIndex());
1293
1294         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1295
1296         assertEquals(2, appendEntriesReply.getLogLastIndex());
1297         assertEquals(1, appendEntriesReply.getLogLastTerm());
1298
1299         assertEquals(2, followerActorContext.getCommitIndex());
1300
1301         follower.close();
1302     }
1303
1304     @Test
1305     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1306         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1307
1308         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1309         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1310                 new FiniteDuration(1000, TimeUnit.SECONDS));
1311
1312         leaderActorContext.setReplicatedLog(
1313                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1314         long leaderCommitIndex = 2;
1315         leaderActorContext.setCommitIndex(leaderCommitIndex);
1316         leaderActorContext.setLastApplied(leaderCommitIndex);
1317
1318         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1319         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1320
1321         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1322
1323         followerActorContext.setReplicatedLog(
1324                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1325         followerActorContext.setCommitIndex(0);
1326         followerActorContext.setLastApplied(0);
1327
1328         Follower follower = new Follower(followerActorContext);
1329         followerActor.underlyingActor().setBehavior(follower);
1330
1331         leader = new Leader(leaderActorContext);
1332
1333         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1334         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1335
1336         MessageCollectorActor.clearMessages(followerActor);
1337         MessageCollectorActor.clearMessages(leaderActor);
1338
1339         // Verify initial AppendEntries sent with the leader's current commit index.
1340         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1341         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1342         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1343
1344         leaderActor.underlyingActor().setBehavior(leader);
1345
1346         leader.handleMessage(followerActor, appendEntriesReply);
1347
1348         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1349         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1350
1351         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1352         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1353         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1354
1355         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1356         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1357                 appendEntries.getEntries().get(0).getData());
1358         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1359         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1360                 appendEntries.getEntries().get(1).getData());
1361
1362         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1363         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1364
1365         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1366
1367         ApplyState applyState = applyStateList.get(0);
1368         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1369         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1370         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1371                 applyState.getReplicatedLogEntry().getData());
1372
1373         applyState = applyStateList.get(1);
1374         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1375         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1376         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1377                 applyState.getReplicatedLogEntry().getData());
1378
1379         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1380         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1381     }
1382
1383     @Test
1384     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1385         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1386
1387         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1388         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1389                 new FiniteDuration(1000, TimeUnit.SECONDS));
1390
1391         leaderActorContext.setReplicatedLog(
1392                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1393         long leaderCommitIndex = 1;
1394         leaderActorContext.setCommitIndex(leaderCommitIndex);
1395         leaderActorContext.setLastApplied(leaderCommitIndex);
1396
1397         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1398         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1399
1400         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1401
1402         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1403         followerActorContext.setCommitIndex(-1);
1404         followerActorContext.setLastApplied(-1);
1405
1406         Follower follower = new Follower(followerActorContext);
1407         followerActor.underlyingActor().setBehavior(follower);
1408
1409         leader = new Leader(leaderActorContext);
1410
1411         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1412         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1413
1414         MessageCollectorActor.clearMessages(followerActor);
1415         MessageCollectorActor.clearMessages(leaderActor);
1416
1417         // Verify initial AppendEntries sent with the leader's current commit index.
1418         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1419         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1420         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1421
1422         leaderActor.underlyingActor().setBehavior(leader);
1423
1424         leader.handleMessage(followerActor, appendEntriesReply);
1425
1426         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1427         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1428
1429         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1430         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1431         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1432
1433         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1434         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1435                 appendEntries.getEntries().get(0).getData());
1436         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1437         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1438                 appendEntries.getEntries().get(1).getData());
1439
1440         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1441         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1442
1443         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1444
1445         ApplyState applyState = applyStateList.get(0);
1446         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1447         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1448         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1449                 applyState.getReplicatedLogEntry().getData());
1450
1451         applyState = applyStateList.get(1);
1452         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1453         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1454         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1455                 applyState.getReplicatedLogEntry().getData());
1456
1457         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1458         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1459     }
1460
1461     @Test
1462     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1463         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1464
1465         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1466         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1467                 new FiniteDuration(1000, TimeUnit.SECONDS));
1468
1469         leaderActorContext.setReplicatedLog(
1470                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1471         long leaderCommitIndex = 1;
1472         leaderActorContext.setCommitIndex(leaderCommitIndex);
1473         leaderActorContext.setLastApplied(leaderCommitIndex);
1474
1475         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1476         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1477
1478         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1479
1480         followerActorContext.setReplicatedLog(
1481                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1482         followerActorContext.setCommitIndex(-1);
1483         followerActorContext.setLastApplied(-1);
1484
1485         Follower follower = new Follower(followerActorContext);
1486         followerActor.underlyingActor().setBehavior(follower);
1487
1488         leader = new Leader(leaderActorContext);
1489
1490         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1491         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1492
1493         MessageCollectorActor.clearMessages(followerActor);
1494         MessageCollectorActor.clearMessages(leaderActor);
1495
1496         // Verify initial AppendEntries sent with the leader's current commit index.
1497         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1498         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1499         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1500
1501         leaderActor.underlyingActor().setBehavior(leader);
1502
1503         leader.handleMessage(followerActor, appendEntriesReply);
1504
1505         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1506         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1507
1508         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1509         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1510         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1511
1512         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1513         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1514         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1515                 appendEntries.getEntries().get(0).getData());
1516         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1517         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1518         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1519                 appendEntries.getEntries().get(1).getData());
1520
1521         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1522         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1523
1524         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1525
1526         ApplyState applyState = applyStateList.get(0);
1527         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1528         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1529         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1530                 applyState.getReplicatedLogEntry().getData());
1531
1532         applyState = applyStateList.get(1);
1533         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1534         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1535         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1536                 applyState.getReplicatedLogEntry().getData());
1537
1538         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1539         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1540         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1541     }
1542
1543     @Test
1544     public void testHandleAppendEntriesReplyWithNewerTerm(){
1545         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1546
1547         MockRaftActorContext leaderActorContext = createActorContext();
1548         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1549                 new FiniteDuration(10000, TimeUnit.SECONDS));
1550
1551         leaderActorContext.setReplicatedLog(
1552                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1553
1554         leader = new Leader(leaderActorContext);
1555         leaderActor.underlyingActor().setBehavior(leader);
1556         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1557
1558         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1559
1560         assertEquals(false, appendEntriesReply.isSuccess());
1561         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1562
1563         MessageCollectorActor.clearMessages(leaderActor);
1564     }
1565
1566     @Test
1567     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1568         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1569
1570         MockRaftActorContext leaderActorContext = createActorContext();
1571         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1572                 new FiniteDuration(10000, TimeUnit.SECONDS));
1573
1574         leaderActorContext.setReplicatedLog(
1575                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1576         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1577
1578         leader = new Leader(leaderActorContext);
1579         leaderActor.underlyingActor().setBehavior(leader);
1580         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1581
1582         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1583
1584         assertEquals(false, appendEntriesReply.isSuccess());
1585         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1586
1587         MessageCollectorActor.clearMessages(leaderActor);
1588     }
1589
1590     @Test
1591     public void testHandleAppendEntriesReplySuccess() throws Exception {
1592         logStart("testHandleAppendEntriesReplySuccess");
1593
1594         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1595
1596         leaderActorContext.setReplicatedLog(
1597                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1598
1599         leaderActorContext.setCommitIndex(1);
1600         leaderActorContext.setLastApplied(1);
1601         leaderActorContext.getTermInformation().update(1, "leader");
1602
1603         leader = new Leader(leaderActorContext);
1604
1605         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1606
1607         short payloadVersion = 5;
1608         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1609
1610         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1611
1612         assertEquals(RaftState.Leader, raftActorBehavior.state());
1613
1614         assertEquals(2, leaderActorContext.getCommitIndex());
1615
1616         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1617                 leaderActor, ApplyJournalEntries.class);
1618
1619         assertEquals(2, leaderActorContext.getLastApplied());
1620
1621         assertEquals(2, applyJournalEntries.getToIndex());
1622
1623         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1624                 ApplyState.class);
1625
1626         assertEquals(1,applyStateList.size());
1627
1628         ApplyState applyState = applyStateList.get(0);
1629
1630         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1631
1632         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1633         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1634     }
1635
1636     @Test
1637     public void testHandleAppendEntriesReplyUnknownFollower(){
1638         logStart("testHandleAppendEntriesReplyUnknownFollower");
1639
1640         MockRaftActorContext leaderActorContext = createActorContext();
1641
1642         leader = new Leader(leaderActorContext);
1643
1644         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1645
1646         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1647
1648         assertEquals(RaftState.Leader, raftActorBehavior.state());
1649     }
1650
1651     @Test
1652     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1653         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1654
1655         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1656         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1657                 new FiniteDuration(1000, TimeUnit.SECONDS));
1658         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1659
1660         leaderActorContext.setReplicatedLog(
1661                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1662         long leaderCommitIndex = 3;
1663         leaderActorContext.setCommitIndex(leaderCommitIndex);
1664         leaderActorContext.setLastApplied(leaderCommitIndex);
1665
1666         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1667         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1668         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1669         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1670
1671         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1672
1673         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1674         followerActorContext.setCommitIndex(-1);
1675         followerActorContext.setLastApplied(-1);
1676
1677         Follower follower = new Follower(followerActorContext);
1678         followerActor.underlyingActor().setBehavior(follower);
1679
1680         leader = new Leader(leaderActorContext);
1681
1682         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1683         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1684
1685         MessageCollectorActor.clearMessages(followerActor);
1686         MessageCollectorActor.clearMessages(leaderActor);
1687
1688         // Verify initial AppendEntries sent with the leader's current commit index.
1689         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1690         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1691         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1692
1693         leaderActor.underlyingActor().setBehavior(leader);
1694
1695         leader.handleMessage(followerActor, appendEntriesReply);
1696
1697         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1698         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1699
1700         appendEntries = appendEntriesList.get(0);
1701         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1702         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1703         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1704
1705         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1706         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1707                 appendEntries.getEntries().get(0).getData());
1708         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1709         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1710                 appendEntries.getEntries().get(1).getData());
1711
1712         appendEntries = appendEntriesList.get(1);
1713         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1714         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1715         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1716
1717         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1718         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1719                 appendEntries.getEntries().get(0).getData());
1720         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1721         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1722                 appendEntries.getEntries().get(1).getData());
1723
1724         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1725         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1726
1727         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1728
1729         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1730         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1731     }
1732
1733     @Test
1734     public void testHandleRequestVoteReply(){
1735         logStart("testHandleRequestVoteReply");
1736
1737         MockRaftActorContext leaderActorContext = createActorContext();
1738
1739         leader = new Leader(leaderActorContext);
1740
1741         // Should be a no-op.
1742         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1743                 new RequestVoteReply(1, true));
1744
1745         assertEquals(RaftState.Leader, raftActorBehavior.state());
1746
1747         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1748
1749         assertEquals(RaftState.Leader, raftActorBehavior.state());
1750     }
1751
1752     @Test
1753     public void testIsolatedLeaderCheckNoFollowers() {
1754         logStart("testIsolatedLeaderCheckNoFollowers");
1755
1756         MockRaftActorContext leaderActorContext = createActorContext();
1757
1758         leader = new Leader(leaderActorContext);
1759         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1760         Assert.assertTrue(behavior instanceof Leader);
1761     }
1762
1763     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1764         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1765         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1766
1767         MockRaftActorContext leaderActorContext = createActorContext();
1768
1769         Map<String, String> peerAddresses = new HashMap<>();
1770         peerAddresses.put("follower-1", followerActor1.path().toString());
1771         peerAddresses.put("follower-2", followerActor2.path().toString());
1772
1773         leaderActorContext.setPeerAddresses(peerAddresses);
1774         leaderActorContext.setRaftPolicy(raftPolicy);
1775
1776         leader = new Leader(leaderActorContext);
1777
1778         leader.markFollowerActive("follower-1");
1779         leader.markFollowerActive("follower-2");
1780         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1781         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1782                 behavior instanceof Leader);
1783
1784         // kill 1 follower and verify if that got killed
1785         final JavaTestKit probe = new JavaTestKit(getSystem());
1786         probe.watch(followerActor1);
1787         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1788         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1789         assertEquals(termMsg1.getActor(), followerActor1);
1790
1791         leader.markFollowerInActive("follower-1");
1792         leader.markFollowerActive("follower-2");
1793         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1794         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1795                 behavior instanceof Leader);
1796
1797         // kill 2nd follower and leader should change to Isolated leader
1798         followerActor2.tell(PoisonPill.getInstance(), null);
1799         probe.watch(followerActor2);
1800         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1801         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1802         assertEquals(termMsg2.getActor(), followerActor2);
1803
1804         leader.markFollowerInActive("follower-2");
1805         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1806     }
1807
1808     @Test
1809     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1810         logStart("testIsolatedLeaderCheckTwoFollowers");
1811
1812         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1813
1814         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1815             behavior instanceof IsolatedLeader);
1816     }
1817
1818     @Test
1819     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1820         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1821
1822         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1823
1824         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1825                 behavior instanceof Leader);
1826     }
1827
1828     @Test
1829     public void testLaggingFollowerStarvation() throws Exception {
1830         logStart("testLaggingFollowerStarvation");
1831         new JavaTestKit(getSystem()) {{
1832             String leaderActorId = actorFactory.generateActorId("leader");
1833             String follower1ActorId = actorFactory.generateActorId("follower");
1834             String follower2ActorId = actorFactory.generateActorId("follower");
1835
1836             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1837                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1838             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1839             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1840
1841             MockRaftActorContext leaderActorContext =
1842                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1843
1844             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1845             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1846             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1847
1848             leaderActorContext.setConfigParams(configParams);
1849
1850             leaderActorContext.setReplicatedLog(
1851                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1852
1853             Map<String, String> peerAddresses = new HashMap<>();
1854             peerAddresses.put(follower1ActorId,
1855                     follower1Actor.path().toString());
1856             peerAddresses.put(follower2ActorId,
1857                     follower2Actor.path().toString());
1858
1859             leaderActorContext.setPeerAddresses(peerAddresses);
1860             leaderActorContext.getTermInformation().update(1, leaderActorId);
1861
1862             RaftActorBehavior leader = createBehavior(leaderActorContext);
1863
1864             leaderActor.underlyingActor().setBehavior(leader);
1865
1866             for(int i=1;i<6;i++) {
1867                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1868                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1869                 assertTrue(newBehavior == leader);
1870                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1871             }
1872
1873             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1874             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1875
1876             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1877                     heartbeats.size() > 1);
1878
1879             // Check if follower-2 got AppendEntries during this time and was not starved
1880             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1881
1882             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1883                     appendEntries.size() > 1);
1884
1885         }};
1886     }
1887
1888     @Test
1889     public void testReplicationConsensusWithNonVotingFollower() {
1890         logStart("testReplicationConsensusWithNonVotingFollower");
1891
1892         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1893         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1894                 new FiniteDuration(1000, TimeUnit.SECONDS));
1895
1896         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1897
1898         String nonVotingFollowerId = "nonvoting-follower";
1899         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1900                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1901
1902         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1903
1904         leader = new Leader(leaderActorContext);
1905
1906         // Ignore initial heartbeats
1907         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1908         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1909
1910         MessageCollectorActor.clearMessages(followerActor);
1911         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1912         MessageCollectorActor.clearMessages(leaderActor);
1913
1914         // Send a Replicate message and wait for AppendEntries.
1915         sendReplicate(leaderActorContext, 0);
1916
1917         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1918         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1919
1920         // Send reply only from the voting follower and verify consensus via ApplyState.
1921         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1922
1923         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1924
1925         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1926
1927         MessageCollectorActor.clearMessages(followerActor);
1928         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1929         MessageCollectorActor.clearMessages(leaderActor);
1930
1931         // Send another Replicate message
1932         sendReplicate(leaderActorContext, 1);
1933
1934         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1935         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1936                 AppendEntries.class);
1937         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1938         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1939
1940         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1941         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1942
1943         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
1944
1945         // Send reply from the voting follower and verify consensus.
1946         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1947
1948         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1949     }
1950
1951     @Test
1952     public void testTransferLeadershipWithFollowerInSync() {
1953         logStart("testTransferLeadershipWithFollowerInSync");
1954
1955         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1956         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1957                 new FiniteDuration(1000, TimeUnit.SECONDS));
1958         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1959
1960         leader = new Leader(leaderActorContext);
1961
1962         // Initial heartbeat
1963         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1964         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
1965         MessageCollectorActor.clearMessages(followerActor);
1966
1967         sendReplicate(leaderActorContext, 0);
1968         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1969
1970         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1971         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1972         MessageCollectorActor.clearMessages(followerActor);
1973
1974         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
1975         leader.transferLeadership(mockTransferCohort);
1976
1977         verify(mockTransferCohort, never()).transferComplete();
1978         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1979         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1980
1981         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
1982         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1983
1984         // Leader should force an election timeout
1985         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
1986
1987         verify(mockTransferCohort).transferComplete();
1988     }
1989
1990     @Test
1991     public void testTransferLeadershipWithEmptyLog() {
1992         logStart("testTransferLeadershipWithEmptyLog");
1993
1994         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1995         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1996                 new FiniteDuration(1000, TimeUnit.SECONDS));
1997         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1998
1999         leader = new Leader(leaderActorContext);
2000
2001         // Initial heartbeat
2002         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2003         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2004         MessageCollectorActor.clearMessages(followerActor);
2005
2006         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2007         leader.transferLeadership(mockTransferCohort);
2008
2009         verify(mockTransferCohort, never()).transferComplete();
2010         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2011         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2012
2013         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2014         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2015
2016         // Leader should force an election timeout
2017         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2018
2019         verify(mockTransferCohort).transferComplete();
2020     }
2021
2022     @Test
2023     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2024         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2025
2026         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2027         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2028                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2029
2030         leader = new Leader(leaderActorContext);
2031
2032         // Initial heartbeat
2033         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2034         MessageCollectorActor.clearMessages(followerActor);
2035
2036         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2037         leader.transferLeadership(mockTransferCohort);
2038
2039         verify(mockTransferCohort, never()).transferComplete();
2040
2041         // Sync up the follower.
2042         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2043         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2044         MessageCollectorActor.clearMessages(followerActor);
2045
2046         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2047                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2048         leader.handleMessage(leaderActor, new SendHeartBeat());
2049         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2050         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2051
2052         // Leader should force an election timeout
2053         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2054
2055         verify(mockTransferCohort).transferComplete();
2056     }
2057
2058     @Test
2059     public void testTransferLeadershipWithFollowerSyncTimeout() {
2060         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2061
2062         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2063         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2064                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2065         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2066         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2067
2068         leader = new Leader(leaderActorContext);
2069
2070         // Initial heartbeat
2071         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2072         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2073         MessageCollectorActor.clearMessages(followerActor);
2074
2075         sendReplicate(leaderActorContext, 0);
2076         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2077
2078         MessageCollectorActor.clearMessages(followerActor);
2079
2080         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2081         leader.transferLeadership(mockTransferCohort);
2082
2083         verify(mockTransferCohort, never()).transferComplete();
2084
2085         // Send heartbeats to time out the transfer.
2086         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2087             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2088                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2089             leader.handleMessage(leaderActor, new SendHeartBeat());
2090         }
2091
2092         verify(mockTransferCohort).abortTransfer();
2093         verify(mockTransferCohort, never()).transferComplete();
2094         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2095     }
2096
2097     @Override
2098     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2099             ActorRef actorRef, RaftRPC rpc) throws Exception {
2100         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2101         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2102     }
2103
2104     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2105
2106         private final long electionTimeOutIntervalMillis;
2107         private final int snapshotChunkSize;
2108
2109         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2110             super();
2111             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2112             this.snapshotChunkSize = snapshotChunkSize;
2113         }
2114
2115         @Override
2116         public FiniteDuration getElectionTimeOutInterval() {
2117             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2118         }
2119
2120         @Override
2121         public int getSnapshotChunkSize() {
2122             return snapshotChunkSize;
2123         }
2124     }
2125 }