BUG 4017: Notification publish service is not available from provider context
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorRef;
19 import akka.actor.PoisonPill;
20 import akka.actor.Props;
21 import akka.actor.Terminated;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.google.protobuf.ByteString;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.After;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
36 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
40 import org.opendaylight.controller.cluster.raft.RaftState;
41 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
43 import org.opendaylight.controller.cluster.raft.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.Snapshot;
45 import org.opendaylight.controller.cluster.raft.VotingState;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
47 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
50 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
51 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
52 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
53 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
54 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
56 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
57 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
58 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
60 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
61 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
62 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
63 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
64 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
65 import scala.concurrent.duration.FiniteDuration;
66
67 public class LeaderTest extends AbstractLeaderTest {
68
69     static final String FOLLOWER_ID = "follower";
70     public static final String LEADER_ID = "leader";
71
72     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
73             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
74
75     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
76             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
77
78     private Leader leader;
79     private final short payloadVersion = 5;
80
81     @Override
82     @After
83     public void tearDown() throws Exception {
84         if(leader != null) {
85             leader.close();
86         }
87
88         super.tearDown();
89     }
90
91     @Test
92     public void testHandleMessageForUnknownMessage() throws Exception {
93         logStart("testHandleMessageForUnknownMessage");
94
95         leader = new Leader(createActorContext());
96
97         // handle message should return the Leader state when it receives an
98         // unknown message
99         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
100         Assert.assertTrue(behavior instanceof Leader);
101     }
102
103     @Test
104     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
105         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
106
107         MockRaftActorContext actorContext = createActorContextWithFollower();
108         short payloadVersion = (short)5;
109         actorContext.setPayloadVersion(payloadVersion);
110
111         long term = 1;
112         actorContext.getTermInformation().update(term, "");
113
114         leader = new Leader(actorContext);
115
116         // Leader should send an immediate heartbeat with no entries as follower is inactive.
117         long lastIndex = actorContext.getReplicatedLog().lastIndex();
118         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119         assertEquals("getTerm", term, appendEntries.getTerm());
120         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
121         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
122         assertEquals("Entries size", 0, appendEntries.getEntries().size());
123         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
124
125         // The follower would normally reply - simulate that explicitly here.
126         leader.handleMessage(followerActor, new AppendEntriesReply(
127                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
128         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
129
130         followerActor.underlyingActor().clear();
131
132         // Sleep for the heartbeat interval so AppendEntries is sent.
133         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
134                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
135
136         leader.handleMessage(leaderActor, new SendHeartBeat());
137
138         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
139         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
140         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
141         assertEquals("Entries size", 1, appendEntries.getEntries().size());
142         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
143         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
144         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
145     }
146
147
148     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
149         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
150         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
151                 1, index, payload);
152         actorContext.getReplicatedLog().append(newEntry);
153         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
154     }
155
156     @Test
157     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
158         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
159
160         MockRaftActorContext actorContext = createActorContextWithFollower();
161
162         long term = 1;
163         actorContext.getTermInformation().update(term, "");
164
165         leader = new Leader(actorContext);
166
167         // Leader will send an immediate heartbeat - ignore it.
168         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
169
170         // The follower would normally reply - simulate that explicitly here.
171         long lastIndex = actorContext.getReplicatedLog().lastIndex();
172         leader.handleMessage(followerActor, new AppendEntriesReply(
173                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
174         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
175
176         followerActor.underlyingActor().clear();
177
178         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
179
180         // State should not change
181         assertTrue(raftBehavior instanceof Leader);
182
183         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
184         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
185         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
186         assertEquals("Entries size", 1, appendEntries.getEntries().size());
187         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
188         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
189         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
190         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
191     }
192
193     @Test
194     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
195         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
196
197         MockRaftActorContext actorContext = createActorContextWithFollower();
198         actorContext.setRaftPolicy(createRaftPolicy(true, true));
199
200         long term = 1;
201         actorContext.getTermInformation().update(term, "");
202
203         leader = new Leader(actorContext);
204
205         // Leader will send an immediate heartbeat - ignore it.
206         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
207
208         // The follower would normally reply - simulate that explicitly here.
209         long lastIndex = actorContext.getReplicatedLog().lastIndex();
210         leader.handleMessage(followerActor, new AppendEntriesReply(
211                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
212         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
213
214         followerActor.underlyingActor().clear();
215
216         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
217
218         // State should not change
219         assertTrue(raftBehavior instanceof Leader);
220
221         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
222         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
223         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
224         assertEquals("Entries size", 1, appendEntries.getEntries().size());
225         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
226         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
227         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
228         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
229     }
230
231     @Test
232     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
233         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
234
235         MockRaftActorContext actorContext = createActorContextWithFollower();
236         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
237             @Override
238             public FiniteDuration getHeartBeatInterval() {
239                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
240             }
241         });
242
243         long term = 1;
244         actorContext.getTermInformation().update(term, "");
245
246         leader = new Leader(actorContext);
247
248         // Leader will send an immediate heartbeat - ignore it.
249         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
250
251         // The follower would normally reply - simulate that explicitly here.
252         long lastIndex = actorContext.getReplicatedLog().lastIndex();
253         leader.handleMessage(followerActor, new AppendEntriesReply(
254                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
255         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
256
257         followerActor.underlyingActor().clear();
258
259         for(int i=0;i<5;i++) {
260             sendReplicate(actorContext, lastIndex+i+1);
261         }
262
263         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
264         // We expect only 1 message to be sent because of two reasons,
265         // - an append entries reply was not received
266         // - the heartbeat interval has not expired
267         // In this scenario if multiple messages are sent they would likely be duplicates
268         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
269     }
270
271     @Test
272     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
273         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
274
275         MockRaftActorContext actorContext = createActorContextWithFollower();
276         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
277             @Override
278             public FiniteDuration getHeartBeatInterval() {
279                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
280             }
281         });
282
283         long term = 1;
284         actorContext.getTermInformation().update(term, "");
285
286         leader = new Leader(actorContext);
287
288         // Leader will send an immediate heartbeat - ignore it.
289         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
290
291         // The follower would normally reply - simulate that explicitly here.
292         long lastIndex = actorContext.getReplicatedLog().lastIndex();
293         leader.handleMessage(followerActor, new AppendEntriesReply(
294                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
295         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
296
297         followerActor.underlyingActor().clear();
298
299         for(int i=0;i<3;i++) {
300             sendReplicate(actorContext, lastIndex+i+1);
301             leader.handleMessage(followerActor, new AppendEntriesReply(
302                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
303
304         }
305
306         for(int i=3;i<5;i++) {
307             sendReplicate(actorContext, lastIndex + i + 1);
308         }
309
310         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
311         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
312         // get sent to the follower - but not the 5th
313         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
314
315         for(int i=0;i<4;i++) {
316             long expected = allMessages.get(i).getEntries().get(0).getIndex();
317             assertEquals(expected, i+2);
318         }
319     }
320
321     @Test
322     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
323         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
324
325         MockRaftActorContext actorContext = createActorContextWithFollower();
326         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
327             @Override
328             public FiniteDuration getHeartBeatInterval() {
329                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
330             }
331         });
332
333         long term = 1;
334         actorContext.getTermInformation().update(term, "");
335
336         leader = new Leader(actorContext);
337
338         // Leader will send an immediate heartbeat - ignore it.
339         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
340
341         // The follower would normally reply - simulate that explicitly here.
342         long lastIndex = actorContext.getReplicatedLog().lastIndex();
343         leader.handleMessage(followerActor, new AppendEntriesReply(
344                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
345         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
346
347         followerActor.underlyingActor().clear();
348
349         sendReplicate(actorContext, lastIndex+1);
350
351         // Wait slightly longer than heartbeat duration
352         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
353
354         leader.handleMessage(leaderActor, new SendHeartBeat());
355
356         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
357         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
358
359         assertEquals(1, allMessages.get(0).getEntries().size());
360         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
361         assertEquals(1, allMessages.get(1).getEntries().size());
362         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
363
364     }
365
366     @Test
367     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
368         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
369
370         MockRaftActorContext actorContext = createActorContextWithFollower();
371         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
372             @Override
373             public FiniteDuration getHeartBeatInterval() {
374                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
375             }
376         });
377
378         long term = 1;
379         actorContext.getTermInformation().update(term, "");
380
381         leader = new Leader(actorContext);
382
383         // Leader will send an immediate heartbeat - ignore it.
384         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
385
386         // The follower would normally reply - simulate that explicitly here.
387         long lastIndex = actorContext.getReplicatedLog().lastIndex();
388         leader.handleMessage(followerActor, new AppendEntriesReply(
389                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
390         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
391
392         followerActor.underlyingActor().clear();
393
394         for(int i=0;i<3;i++) {
395             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
396             leader.handleMessage(leaderActor, new SendHeartBeat());
397         }
398
399         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
400         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
401     }
402
403     @Test
404     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
405         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
406
407         MockRaftActorContext actorContext = createActorContextWithFollower();
408         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
409             @Override
410             public FiniteDuration getHeartBeatInterval() {
411                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
412             }
413         });
414
415         long term = 1;
416         actorContext.getTermInformation().update(term, "");
417
418         leader = new Leader(actorContext);
419
420         // Leader will send an immediate heartbeat - ignore it.
421         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
422
423         // The follower would normally reply - simulate that explicitly here.
424         long lastIndex = actorContext.getReplicatedLog().lastIndex();
425         leader.handleMessage(followerActor, new AppendEntriesReply(
426                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
427         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
428
429         followerActor.underlyingActor().clear();
430
431         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
432         leader.handleMessage(leaderActor, new SendHeartBeat());
433         sendReplicate(actorContext, lastIndex+1);
434
435         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
436         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
437
438         assertEquals(0, allMessages.get(0).getEntries().size());
439         assertEquals(1, allMessages.get(1).getEntries().size());
440     }
441
442
443     @Test
444     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
445         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
446
447         MockRaftActorContext actorContext = createActorContext();
448
449         leader = new Leader(actorContext);
450
451         actorContext.setLastApplied(0);
452
453         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
454         long term = actorContext.getTermInformation().getCurrentTerm();
455         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
456                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
457
458         actorContext.getReplicatedLog().append(newEntry);
459
460         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
461                 new Replicate(leaderActor, "state-id", newEntry));
462
463         // State should not change
464         assertTrue(raftBehavior instanceof Leader);
465
466         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
467
468         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
469         // one since lastApplied state is 0.
470         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
471                 leaderActor, ApplyState.class);
472         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
473
474         for(int i = 0; i <= newLogIndex - 1; i++ ) {
475             ApplyState applyState = applyStateList.get(i);
476             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
477             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
478         }
479
480         ApplyState last = applyStateList.get((int) newLogIndex - 1);
481         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
482         assertEquals("getIdentifier", "state-id", last.getIdentifier());
483     }
484
485     @Test
486     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
487         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
488
489         MockRaftActorContext actorContext = createActorContextWithFollower();
490
491         Map<String, String> leadersSnapshot = new HashMap<>();
492         leadersSnapshot.put("1", "A");
493         leadersSnapshot.put("2", "B");
494         leadersSnapshot.put("3", "C");
495
496         //clears leaders log
497         actorContext.getReplicatedLog().removeFrom(0);
498
499         final int commitIndex = 3;
500         final int snapshotIndex = 2;
501         final int newEntryIndex = 4;
502         final int snapshotTerm = 1;
503         final int currentTerm = 2;
504
505         // set the snapshot variables in replicatedlog
506         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
507         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
508         actorContext.setCommitIndex(commitIndex);
509         //set follower timeout to 2 mins, helps during debugging
510         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
511
512         leader = new Leader(actorContext);
513
514         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
515         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
516
517         // new entry
518         ReplicatedLogImplEntry entry =
519                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
520                         new MockRaftActorContext.MockPayload("D"));
521
522         //update follower timestamp
523         leader.markFollowerActive(FOLLOWER_ID);
524
525         ByteString bs = toByteString(leadersSnapshot);
526         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
527                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
528         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
529         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
530
531         //send first chunk and no InstallSnapshotReply received yet
532         fts.getNextChunk();
533         fts.incrementChunkIndex();
534
535         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
536                 TimeUnit.MILLISECONDS);
537
538         leader.handleMessage(leaderActor, new SendHeartBeat());
539
540         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
541
542         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
543
544         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
545
546         //InstallSnapshotReply received
547         fts.markSendStatus(true);
548
549         leader.handleMessage(leaderActor, new SendHeartBeat());
550
551         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
552
553         assertEquals(commitIndex, is.getLastIncludedIndex());
554     }
555
556     @Test
557     public void testSendAppendEntriesSnapshotScenario() throws Exception {
558         logStart("testSendAppendEntriesSnapshotScenario");
559
560         MockRaftActorContext actorContext = createActorContextWithFollower();
561
562         Map<String, String> leadersSnapshot = new HashMap<>();
563         leadersSnapshot.put("1", "A");
564         leadersSnapshot.put("2", "B");
565         leadersSnapshot.put("3", "C");
566
567         //clears leaders log
568         actorContext.getReplicatedLog().removeFrom(0);
569
570         final int followersLastIndex = 2;
571         final int snapshotIndex = 3;
572         final int newEntryIndex = 4;
573         final int snapshotTerm = 1;
574         final int currentTerm = 2;
575
576         // set the snapshot variables in replicatedlog
577         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
578         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
579         actorContext.setCommitIndex(followersLastIndex);
580
581         leader = new Leader(actorContext);
582
583         // Leader will send an immediate heartbeat - ignore it.
584         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
585
586         // new entry
587         ReplicatedLogImplEntry entry =
588                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
589                         new MockRaftActorContext.MockPayload("D"));
590
591         actorContext.getReplicatedLog().append(entry);
592
593         //update follower timestamp
594         leader.markFollowerActive(FOLLOWER_ID);
595
596         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
597         RaftActorBehavior raftBehavior = leader.handleMessage(
598                 leaderActor, new Replicate(null, "state-id", entry));
599
600         assertTrue(raftBehavior instanceof Leader);
601
602         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
603     }
604
605     @Test
606     public void testInitiateInstallSnapshot() throws Exception {
607         logStart("testInitiateInstallSnapshot");
608
609         MockRaftActorContext actorContext = createActorContextWithFollower();
610
611         //clears leaders log
612         actorContext.getReplicatedLog().removeFrom(0);
613
614         final int followersLastIndex = 2;
615         final int snapshotIndex = 3;
616         final int newEntryIndex = 4;
617         final int snapshotTerm = 1;
618         final int currentTerm = 2;
619
620         // set the snapshot variables in replicatedlog
621         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
622         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
623         actorContext.setLastApplied(3);
624         actorContext.setCommitIndex(followersLastIndex);
625
626         leader = new Leader(actorContext);
627
628         // Leader will send an immediate heartbeat - ignore it.
629         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
630
631         // set the snapshot as absent and check if capture-snapshot is invoked.
632         leader.setSnapshot(null);
633
634         // new entry
635         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
636                 new MockRaftActorContext.MockPayload("D"));
637
638         actorContext.getReplicatedLog().append(entry);
639
640         //update follower timestamp
641         leader.markFollowerActive(FOLLOWER_ID);
642
643         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
644
645         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
646
647         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
648
649         assertTrue(cs.isInstallSnapshotInitiated());
650         assertEquals(3, cs.getLastAppliedIndex());
651         assertEquals(1, cs.getLastAppliedTerm());
652         assertEquals(4, cs.getLastIndex());
653         assertEquals(2, cs.getLastTerm());
654
655         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
656         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
657
658         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
659     }
660
661     @Test
662     public void testInitiateForceInstallSnapshot() throws Exception {
663         logStart("testInitiateForceInstallSnapshot");
664
665         MockRaftActorContext actorContext = createActorContextWithFollower();
666
667         final int followersLastIndex = 2;
668         final int snapshotIndex = -1;
669         final int newEntryIndex = 4;
670         final int snapshotTerm = -1;
671         final int currentTerm = 2;
672
673         // set the snapshot variables in replicatedlog
674         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
675         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
676         actorContext.setLastApplied(3);
677         actorContext.setCommitIndex(followersLastIndex);
678
679         actorContext.getReplicatedLog().removeFrom(0);
680
681         leader = new Leader(actorContext);
682
683         // Leader will send an immediate heartbeat - ignore it.
684         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
685
686         // set the snapshot as absent and check if capture-snapshot is invoked.
687         leader.setSnapshot(null);
688
689         for(int i=0;i<4;i++) {
690             actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
691                     new MockRaftActorContext.MockPayload("X" + i)));
692         }
693
694         // new entry
695         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
696                 new MockRaftActorContext.MockPayload("D"));
697
698         actorContext.getReplicatedLog().append(entry);
699
700         //update follower timestamp
701         leader.markFollowerActive(FOLLOWER_ID);
702
703         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
704         // installed with a SendInstallSnapshot
705         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
706
707         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
708
709         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
710
711         assertTrue(cs.isInstallSnapshotInitiated());
712         assertEquals(3, cs.getLastAppliedIndex());
713         assertEquals(1, cs.getLastAppliedTerm());
714         assertEquals(4, cs.getLastIndex());
715         assertEquals(2, cs.getLastTerm());
716
717         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
718         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
719
720         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
721     }
722
723
724     @Test
725     public void testInstallSnapshot() throws Exception {
726         logStart("testInstallSnapshot");
727
728         MockRaftActorContext actorContext = createActorContextWithFollower();
729
730         Map<String, String> leadersSnapshot = new HashMap<>();
731         leadersSnapshot.put("1", "A");
732         leadersSnapshot.put("2", "B");
733         leadersSnapshot.put("3", "C");
734
735         //clears leaders log
736         actorContext.getReplicatedLog().removeFrom(0);
737
738         final int lastAppliedIndex = 3;
739         final int snapshotIndex = 2;
740         final int snapshotTerm = 1;
741         final int currentTerm = 2;
742
743         // set the snapshot variables in replicatedlog
744         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
745         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
746         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
747         actorContext.setCommitIndex(lastAppliedIndex);
748         actorContext.setLastApplied(lastAppliedIndex);
749
750         leader = new Leader(actorContext);
751
752         // Initial heartbeat.
753         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
754
755         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
756         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
757
758         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
759                 Collections.<ReplicatedLogEntry>emptyList(),
760                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
761
762         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
763
764         assertTrue(raftBehavior instanceof Leader);
765
766         // check if installsnapshot gets called with the correct values.
767
768         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
769
770         assertNotNull(installSnapshot.getData());
771         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
772         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
773
774         assertEquals(currentTerm, installSnapshot.getTerm());
775     }
776
777     @Test
778     public void testForceInstallSnapshot() throws Exception {
779         logStart("testForceInstallSnapshot");
780
781         MockRaftActorContext actorContext = createActorContextWithFollower();
782
783         Map<String, String> leadersSnapshot = new HashMap<>();
784         leadersSnapshot.put("1", "A");
785         leadersSnapshot.put("2", "B");
786         leadersSnapshot.put("3", "C");
787
788         final int lastAppliedIndex = 3;
789         final int snapshotIndex = -1;
790         final int snapshotTerm = -1;
791         final int currentTerm = 2;
792
793         // set the snapshot variables in replicatedlog
794         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
795         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
796         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
797         actorContext.setCommitIndex(lastAppliedIndex);
798         actorContext.setLastApplied(lastAppliedIndex);
799
800         leader = new Leader(actorContext);
801
802         // Initial heartbeat.
803         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
804
805         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
806         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
807
808         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
809                 Collections.<ReplicatedLogEntry>emptyList(),
810                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
811
812         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
813
814         assertTrue(raftBehavior instanceof Leader);
815
816         // check if installsnapshot gets called with the correct values.
817
818         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
819
820         assertNotNull(installSnapshot.getData());
821         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
822         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
823
824         assertEquals(currentTerm, installSnapshot.getTerm());
825     }
826
827     @Test
828     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
829         logStart("testHandleInstallSnapshotReplyLastChunk");
830
831         MockRaftActorContext actorContext = createActorContextWithFollower();
832
833         final int commitIndex = 3;
834         final int snapshotIndex = 2;
835         final int snapshotTerm = 1;
836         final int currentTerm = 2;
837
838         actorContext.setCommitIndex(commitIndex);
839
840         leader = new Leader(actorContext);
841
842         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
843         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
844
845         // Ignore initial heartbeat.
846         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
847
848         Map<String, String> leadersSnapshot = new HashMap<>();
849         leadersSnapshot.put("1", "A");
850         leadersSnapshot.put("2", "B");
851         leadersSnapshot.put("3", "C");
852
853         // set the snapshot variables in replicatedlog
854
855         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
856         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
857         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
858
859         ByteString bs = toByteString(leadersSnapshot);
860         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
861                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
862         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
863         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
864         while(!fts.isLastChunk(fts.getChunkIndex())) {
865             fts.getNextChunk();
866             fts.incrementChunkIndex();
867         }
868
869         //clears leaders log
870         actorContext.getReplicatedLog().removeFrom(0);
871
872         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
873                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
874
875         assertTrue(raftBehavior instanceof Leader);
876
877         assertEquals(0, leader.followerSnapshotSize());
878         assertEquals(1, leader.followerLogSize());
879         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
880         assertNotNull(fli);
881         assertEquals(commitIndex, fli.getMatchIndex());
882         assertEquals(commitIndex + 1, fli.getNextIndex());
883     }
884
885     @Test
886     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
887         logStart("testSendSnapshotfromInstallSnapshotReply");
888
889         MockRaftActorContext actorContext = createActorContextWithFollower();
890
891         final int commitIndex = 3;
892         final int snapshotIndex = 2;
893         final int snapshotTerm = 1;
894         final int currentTerm = 2;
895
896         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
897             @Override
898             public int getSnapshotChunkSize() {
899                 return 50;
900             }
901         };
902         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
903         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
904
905         actorContext.setConfigParams(configParams);
906         actorContext.setCommitIndex(commitIndex);
907
908         leader = new Leader(actorContext);
909
910         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
911         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
912
913         Map<String, String> leadersSnapshot = new HashMap<>();
914         leadersSnapshot.put("1", "A");
915         leadersSnapshot.put("2", "B");
916         leadersSnapshot.put("3", "C");
917
918         // set the snapshot variables in replicatedlog
919         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
920         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
921         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
922
923         ByteString bs = toByteString(leadersSnapshot);
924         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
925                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
926         leader.setSnapshot(snapshot);
927
928         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
929
930         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
931
932         assertEquals(1, installSnapshot.getChunkIndex());
933         assertEquals(3, installSnapshot.getTotalChunks());
934
935         followerActor.underlyingActor().clear();
936         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
937                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
938
939         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
940
941         assertEquals(2, installSnapshot.getChunkIndex());
942         assertEquals(3, installSnapshot.getTotalChunks());
943
944         followerActor.underlyingActor().clear();
945         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
946                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
947
948         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
949
950         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
951         followerActor.underlyingActor().clear();
952         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
953                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
954
955         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
956
957         Assert.assertNull(installSnapshot);
958     }
959
960
961     @Test
962     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
963         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
964
965         MockRaftActorContext actorContext = createActorContextWithFollower();
966
967         final int commitIndex = 3;
968         final int snapshotIndex = 2;
969         final int snapshotTerm = 1;
970         final int currentTerm = 2;
971
972         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
973             @Override
974             public int getSnapshotChunkSize() {
975                 return 50;
976             }
977         });
978
979         actorContext.setCommitIndex(commitIndex);
980
981         leader = new Leader(actorContext);
982
983         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
984         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
985
986         Map<String, String> leadersSnapshot = new HashMap<>();
987         leadersSnapshot.put("1", "A");
988         leadersSnapshot.put("2", "B");
989         leadersSnapshot.put("3", "C");
990
991         // set the snapshot variables in replicatedlog
992         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
993         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
994         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
995
996         ByteString bs = toByteString(leadersSnapshot);
997         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
998                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
999         leader.setSnapshot(snapshot);
1000
1001         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1002         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1003
1004         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1005
1006         assertEquals(1, installSnapshot.getChunkIndex());
1007         assertEquals(3, installSnapshot.getTotalChunks());
1008
1009         followerActor.underlyingActor().clear();
1010
1011         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1012                 FOLLOWER_ID, -1, false));
1013
1014         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1015                 TimeUnit.MILLISECONDS);
1016
1017         leader.handleMessage(leaderActor, new SendHeartBeat());
1018
1019         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1020
1021         assertEquals(1, installSnapshot.getChunkIndex());
1022         assertEquals(3, installSnapshot.getTotalChunks());
1023     }
1024
1025     @Test
1026     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
1027         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1028
1029         MockRaftActorContext actorContext = createActorContextWithFollower();
1030
1031         final int commitIndex = 3;
1032         final int snapshotIndex = 2;
1033         final int snapshotTerm = 1;
1034         final int currentTerm = 2;
1035
1036         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1037             @Override
1038             public int getSnapshotChunkSize() {
1039                 return 50;
1040             }
1041         });
1042
1043         actorContext.setCommitIndex(commitIndex);
1044
1045         leader = new Leader(actorContext);
1046
1047         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1048         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1049
1050         Map<String, String> leadersSnapshot = new HashMap<>();
1051         leadersSnapshot.put("1", "A");
1052         leadersSnapshot.put("2", "B");
1053         leadersSnapshot.put("3", "C");
1054
1055         // set the snapshot variables in replicatedlog
1056         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1057         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1058         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1059
1060         ByteString bs = toByteString(leadersSnapshot);
1061         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
1062                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
1063         leader.setSnapshot(snapshot);
1064
1065         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
1066
1067         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1068
1069         assertEquals(1, installSnapshot.getChunkIndex());
1070         assertEquals(3, installSnapshot.getTotalChunks());
1071         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
1072
1073         int hashCode = installSnapshot.getData().hashCode();
1074
1075         followerActor.underlyingActor().clear();
1076
1077         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1078                 FOLLOWER_ID, 1, true));
1079
1080         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1081
1082         assertEquals(2, installSnapshot.getChunkIndex());
1083         assertEquals(3, installSnapshot.getTotalChunks());
1084         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1085     }
1086
1087     @Test
1088     public void testFollowerToSnapshotLogic() {
1089         logStart("testFollowerToSnapshotLogic");
1090
1091         MockRaftActorContext actorContext = createActorContext();
1092
1093         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1094             @Override
1095             public int getSnapshotChunkSize() {
1096                 return 50;
1097             }
1098         });
1099
1100         leader = new Leader(actorContext);
1101
1102         Map<String, String> leadersSnapshot = new HashMap<>();
1103         leadersSnapshot.put("1", "A");
1104         leadersSnapshot.put("2", "B");
1105         leadersSnapshot.put("3", "C");
1106
1107         ByteString bs = toByteString(leadersSnapshot);
1108         byte[] barray = bs.toByteArray();
1109
1110         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
1111         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
1112
1113         assertEquals(bs.size(), barray.length);
1114
1115         int chunkIndex=0;
1116         for (int i=0; i < barray.length; i = i + 50) {
1117             int j = i + 50;
1118             chunkIndex++;
1119
1120             if (i + 50 > barray.length) {
1121                 j = barray.length;
1122             }
1123
1124             ByteString chunk = fts.getNextChunk();
1125             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1126             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1127
1128             fts.markSendStatus(true);
1129             if (!fts.isLastChunk(chunkIndex)) {
1130                 fts.incrementChunkIndex();
1131             }
1132         }
1133
1134         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1135     }
1136
1137     @Override protected RaftActorBehavior createBehavior(
1138         RaftActorContext actorContext) {
1139         return new Leader(actorContext);
1140     }
1141
1142     @Override
1143     protected MockRaftActorContext createActorContext() {
1144         return createActorContext(leaderActor);
1145     }
1146
1147     @Override
1148     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1149         return createActorContext(LEADER_ID, actorRef);
1150     }
1151
1152     private MockRaftActorContext createActorContextWithFollower() {
1153         MockRaftActorContext actorContext = createActorContext();
1154         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1155                 followerActor.path().toString()).build());
1156         return actorContext;
1157     }
1158
1159     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1160         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1161         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1162         configParams.setElectionTimeoutFactor(100000);
1163         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1164         context.setConfigParams(configParams);
1165         context.setPayloadVersion(payloadVersion);
1166         return context;
1167     }
1168
1169     private MockRaftActorContext createFollowerActorContextWithLeader() {
1170         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1171         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1172         followerConfig.setElectionTimeoutFactor(10000);
1173         followerActorContext.setConfigParams(followerConfig);
1174         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1175         return followerActorContext;
1176     }
1177
1178     @Test
1179     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1180         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1181
1182         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1183
1184         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1185
1186         Follower follower = new Follower(followerActorContext);
1187         followerActor.underlyingActor().setBehavior(follower);
1188
1189         Map<String, String> peerAddresses = new HashMap<>();
1190         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1191
1192         leaderActorContext.setPeerAddresses(peerAddresses);
1193
1194         leaderActorContext.getReplicatedLog().removeFrom(0);
1195
1196         //create 3 entries
1197         leaderActorContext.setReplicatedLog(
1198                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1199
1200         leaderActorContext.setCommitIndex(1);
1201
1202         followerActorContext.getReplicatedLog().removeFrom(0);
1203
1204         // follower too has the exact same log entries and has the same commit index
1205         followerActorContext.setReplicatedLog(
1206                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1207
1208         followerActorContext.setCommitIndex(1);
1209
1210         leader = new Leader(leaderActorContext);
1211
1212         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1213
1214         assertEquals(1, appendEntries.getLeaderCommit());
1215         assertEquals(0, appendEntries.getEntries().size());
1216         assertEquals(0, appendEntries.getPrevLogIndex());
1217
1218         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1219                 leaderActor, AppendEntriesReply.class);
1220
1221         assertEquals(2, appendEntriesReply.getLogLastIndex());
1222         assertEquals(1, appendEntriesReply.getLogLastTerm());
1223
1224         // follower returns its next index
1225         assertEquals(2, appendEntriesReply.getLogLastIndex());
1226         assertEquals(1, appendEntriesReply.getLogLastTerm());
1227
1228         follower.close();
1229     }
1230
1231     @Test
1232     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1233         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1234
1235         MockRaftActorContext leaderActorContext = createActorContext();
1236
1237         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1238         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1239
1240         Follower follower = new Follower(followerActorContext);
1241         followerActor.underlyingActor().setBehavior(follower);
1242
1243         Map<String, String> leaderPeerAddresses = new HashMap<>();
1244         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1245
1246         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1247
1248         leaderActorContext.getReplicatedLog().removeFrom(0);
1249
1250         leaderActorContext.setReplicatedLog(
1251                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1252
1253         leaderActorContext.setCommitIndex(1);
1254
1255         followerActorContext.getReplicatedLog().removeFrom(0);
1256
1257         followerActorContext.setReplicatedLog(
1258                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1259
1260         // follower has the same log entries but its commit index > leaders commit index
1261         followerActorContext.setCommitIndex(2);
1262
1263         leader = new Leader(leaderActorContext);
1264
1265         // Initial heartbeat
1266         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1267
1268         assertEquals(1, appendEntries.getLeaderCommit());
1269         assertEquals(0, appendEntries.getEntries().size());
1270         assertEquals(0, appendEntries.getPrevLogIndex());
1271
1272         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1273                 leaderActor, AppendEntriesReply.class);
1274
1275         assertEquals(2, appendEntriesReply.getLogLastIndex());
1276         assertEquals(1, appendEntriesReply.getLogLastTerm());
1277
1278         leaderActor.underlyingActor().setBehavior(follower);
1279         leader.handleMessage(followerActor, appendEntriesReply);
1280
1281         leaderActor.underlyingActor().clear();
1282         followerActor.underlyingActor().clear();
1283
1284         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1285                 TimeUnit.MILLISECONDS);
1286
1287         leader.handleMessage(leaderActor, new SendHeartBeat());
1288
1289         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1290
1291         assertEquals(2, appendEntries.getLeaderCommit());
1292         assertEquals(0, appendEntries.getEntries().size());
1293         assertEquals(2, appendEntries.getPrevLogIndex());
1294
1295         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1296
1297         assertEquals(2, appendEntriesReply.getLogLastIndex());
1298         assertEquals(1, appendEntriesReply.getLogLastTerm());
1299
1300         assertEquals(2, followerActorContext.getCommitIndex());
1301
1302         follower.close();
1303     }
1304
1305     @Test
1306     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1307         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1308
1309         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1310         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1311                 new FiniteDuration(1000, TimeUnit.SECONDS));
1312
1313         leaderActorContext.setReplicatedLog(
1314                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1315         long leaderCommitIndex = 2;
1316         leaderActorContext.setCommitIndex(leaderCommitIndex);
1317         leaderActorContext.setLastApplied(leaderCommitIndex);
1318
1319         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1320         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1321
1322         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1323
1324         followerActorContext.setReplicatedLog(
1325                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1326         followerActorContext.setCommitIndex(0);
1327         followerActorContext.setLastApplied(0);
1328
1329         Follower follower = new Follower(followerActorContext);
1330         followerActor.underlyingActor().setBehavior(follower);
1331
1332         leader = new Leader(leaderActorContext);
1333
1334         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1335         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1336
1337         MessageCollectorActor.clearMessages(followerActor);
1338         MessageCollectorActor.clearMessages(leaderActor);
1339
1340         // Verify initial AppendEntries sent with the leader's current commit index.
1341         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1342         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1343         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1344
1345         leaderActor.underlyingActor().setBehavior(leader);
1346
1347         leader.handleMessage(followerActor, appendEntriesReply);
1348
1349         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1350         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1351
1352         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1353         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1354         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1355
1356         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1357         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1358                 appendEntries.getEntries().get(0).getData());
1359         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1360         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1361                 appendEntries.getEntries().get(1).getData());
1362
1363         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1364         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1365
1366         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1367
1368         ApplyState applyState = applyStateList.get(0);
1369         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1370         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1371         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1372                 applyState.getReplicatedLogEntry().getData());
1373
1374         applyState = applyStateList.get(1);
1375         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1376         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1377         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1378                 applyState.getReplicatedLogEntry().getData());
1379
1380         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1381         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1382     }
1383
1384     @Test
1385     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1386         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1387
1388         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1389         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1390                 new FiniteDuration(1000, TimeUnit.SECONDS));
1391
1392         leaderActorContext.setReplicatedLog(
1393                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1394         long leaderCommitIndex = 1;
1395         leaderActorContext.setCommitIndex(leaderCommitIndex);
1396         leaderActorContext.setLastApplied(leaderCommitIndex);
1397
1398         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1399         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1400
1401         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1402
1403         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1404         followerActorContext.setCommitIndex(-1);
1405         followerActorContext.setLastApplied(-1);
1406
1407         Follower follower = new Follower(followerActorContext);
1408         followerActor.underlyingActor().setBehavior(follower);
1409
1410         leader = new Leader(leaderActorContext);
1411
1412         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1413         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1414
1415         MessageCollectorActor.clearMessages(followerActor);
1416         MessageCollectorActor.clearMessages(leaderActor);
1417
1418         // Verify initial AppendEntries sent with the leader's current commit index.
1419         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1420         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1421         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1422
1423         leaderActor.underlyingActor().setBehavior(leader);
1424
1425         leader.handleMessage(followerActor, appendEntriesReply);
1426
1427         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1428         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1429
1430         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1431         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1432         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1433
1434         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1435         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1436                 appendEntries.getEntries().get(0).getData());
1437         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1438         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1439                 appendEntries.getEntries().get(1).getData());
1440
1441         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1442         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1443
1444         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1445
1446         ApplyState applyState = applyStateList.get(0);
1447         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1448         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1449         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1450                 applyState.getReplicatedLogEntry().getData());
1451
1452         applyState = applyStateList.get(1);
1453         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1454         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1455         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1456                 applyState.getReplicatedLogEntry().getData());
1457
1458         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1459         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1460     }
1461
1462     @Test
1463     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1464         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1465
1466         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1467         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1468                 new FiniteDuration(1000, TimeUnit.SECONDS));
1469
1470         leaderActorContext.setReplicatedLog(
1471                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1472         long leaderCommitIndex = 1;
1473         leaderActorContext.setCommitIndex(leaderCommitIndex);
1474         leaderActorContext.setLastApplied(leaderCommitIndex);
1475
1476         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1477         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1478
1479         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1480
1481         followerActorContext.setReplicatedLog(
1482                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1483         followerActorContext.setCommitIndex(-1);
1484         followerActorContext.setLastApplied(-1);
1485
1486         Follower follower = new Follower(followerActorContext);
1487         followerActor.underlyingActor().setBehavior(follower);
1488
1489         leader = new Leader(leaderActorContext);
1490
1491         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1492         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1493
1494         MessageCollectorActor.clearMessages(followerActor);
1495         MessageCollectorActor.clearMessages(leaderActor);
1496
1497         // Verify initial AppendEntries sent with the leader's current commit index.
1498         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1499         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1500         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1501
1502         leaderActor.underlyingActor().setBehavior(leader);
1503
1504         leader.handleMessage(followerActor, appendEntriesReply);
1505
1506         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1507         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1508
1509         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1510         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1511         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1512
1513         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1514         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1515         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1516                 appendEntries.getEntries().get(0).getData());
1517         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1518         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1519         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1520                 appendEntries.getEntries().get(1).getData());
1521
1522         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1523         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1524
1525         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1526
1527         ApplyState applyState = applyStateList.get(0);
1528         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1529         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1530         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1531                 applyState.getReplicatedLogEntry().getData());
1532
1533         applyState = applyStateList.get(1);
1534         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1535         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1536         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1537                 applyState.getReplicatedLogEntry().getData());
1538
1539         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1540         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1541         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1542     }
1543
1544     @Test
1545     public void testHandleAppendEntriesReplyWithNewerTerm(){
1546         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1547
1548         MockRaftActorContext leaderActorContext = createActorContext();
1549         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1550                 new FiniteDuration(10000, TimeUnit.SECONDS));
1551
1552         leaderActorContext.setReplicatedLog(
1553                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1554
1555         leader = new Leader(leaderActorContext);
1556         leaderActor.underlyingActor().setBehavior(leader);
1557         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1558
1559         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1560
1561         assertEquals(false, appendEntriesReply.isSuccess());
1562         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1563
1564         MessageCollectorActor.clearMessages(leaderActor);
1565     }
1566
1567     @Test
1568     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1569         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1570
1571         MockRaftActorContext leaderActorContext = createActorContext();
1572         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1573                 new FiniteDuration(10000, TimeUnit.SECONDS));
1574
1575         leaderActorContext.setReplicatedLog(
1576                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1577         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1578
1579         leader = new Leader(leaderActorContext);
1580         leaderActor.underlyingActor().setBehavior(leader);
1581         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1582
1583         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1584
1585         assertEquals(false, appendEntriesReply.isSuccess());
1586         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1587
1588         MessageCollectorActor.clearMessages(leaderActor);
1589     }
1590
1591     @Test
1592     public void testHandleAppendEntriesReplySuccess() throws Exception {
1593         logStart("testHandleAppendEntriesReplySuccess");
1594
1595         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1596
1597         leaderActorContext.setReplicatedLog(
1598                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1599
1600         leaderActorContext.setCommitIndex(1);
1601         leaderActorContext.setLastApplied(1);
1602         leaderActorContext.getTermInformation().update(1, "leader");
1603
1604         leader = new Leader(leaderActorContext);
1605
1606         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1607
1608         short payloadVersion = 5;
1609         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1610
1611         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1612
1613         assertEquals(RaftState.Leader, raftActorBehavior.state());
1614
1615         assertEquals(2, leaderActorContext.getCommitIndex());
1616
1617         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1618                 leaderActor, ApplyJournalEntries.class);
1619
1620         assertEquals(2, leaderActorContext.getLastApplied());
1621
1622         assertEquals(2, applyJournalEntries.getToIndex());
1623
1624         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1625                 ApplyState.class);
1626
1627         assertEquals(1,applyStateList.size());
1628
1629         ApplyState applyState = applyStateList.get(0);
1630
1631         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1632
1633         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1634         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1635     }
1636
1637     @Test
1638     public void testHandleAppendEntriesReplyUnknownFollower(){
1639         logStart("testHandleAppendEntriesReplyUnknownFollower");
1640
1641         MockRaftActorContext leaderActorContext = createActorContext();
1642
1643         leader = new Leader(leaderActorContext);
1644
1645         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1646
1647         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1648
1649         assertEquals(RaftState.Leader, raftActorBehavior.state());
1650     }
1651
1652     @Test
1653     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1654         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1655
1656         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1657         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1658                 new FiniteDuration(1000, TimeUnit.SECONDS));
1659         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1660
1661         leaderActorContext.setReplicatedLog(
1662                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1663         long leaderCommitIndex = 3;
1664         leaderActorContext.setCommitIndex(leaderCommitIndex);
1665         leaderActorContext.setLastApplied(leaderCommitIndex);
1666
1667         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1668         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1669         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1670         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1671
1672         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1673
1674         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1675         followerActorContext.setCommitIndex(-1);
1676         followerActorContext.setLastApplied(-1);
1677
1678         Follower follower = new Follower(followerActorContext);
1679         followerActor.underlyingActor().setBehavior(follower);
1680
1681         leader = new Leader(leaderActorContext);
1682
1683         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1684         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1685
1686         MessageCollectorActor.clearMessages(followerActor);
1687         MessageCollectorActor.clearMessages(leaderActor);
1688
1689         // Verify initial AppendEntries sent with the leader's current commit index.
1690         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1691         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1692         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1693
1694         leaderActor.underlyingActor().setBehavior(leader);
1695
1696         leader.handleMessage(followerActor, appendEntriesReply);
1697
1698         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1699         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1700
1701         appendEntries = appendEntriesList.get(0);
1702         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1703         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1704         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1705
1706         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1707         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1708                 appendEntries.getEntries().get(0).getData());
1709         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1710         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1711                 appendEntries.getEntries().get(1).getData());
1712
1713         appendEntries = appendEntriesList.get(1);
1714         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1715         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1716         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1717
1718         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1719         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1720                 appendEntries.getEntries().get(0).getData());
1721         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1722         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1723                 appendEntries.getEntries().get(1).getData());
1724
1725         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1726         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1727
1728         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1729
1730         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1731         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1732     }
1733
1734     @Test
1735     public void testHandleRequestVoteReply(){
1736         logStart("testHandleRequestVoteReply");
1737
1738         MockRaftActorContext leaderActorContext = createActorContext();
1739
1740         leader = new Leader(leaderActorContext);
1741
1742         // Should be a no-op.
1743         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1744                 new RequestVoteReply(1, true));
1745
1746         assertEquals(RaftState.Leader, raftActorBehavior.state());
1747
1748         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1749
1750         assertEquals(RaftState.Leader, raftActorBehavior.state());
1751     }
1752
1753     @Test
1754     public void testIsolatedLeaderCheckNoFollowers() {
1755         logStart("testIsolatedLeaderCheckNoFollowers");
1756
1757         MockRaftActorContext leaderActorContext = createActorContext();
1758
1759         leader = new Leader(leaderActorContext);
1760         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1761         Assert.assertTrue(behavior instanceof Leader);
1762     }
1763
1764     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1765         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1766         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1767
1768         MockRaftActorContext leaderActorContext = createActorContext();
1769
1770         Map<String, String> peerAddresses = new HashMap<>();
1771         peerAddresses.put("follower-1", followerActor1.path().toString());
1772         peerAddresses.put("follower-2", followerActor2.path().toString());
1773
1774         leaderActorContext.setPeerAddresses(peerAddresses);
1775         leaderActorContext.setRaftPolicy(raftPolicy);
1776
1777         leader = new Leader(leaderActorContext);
1778
1779         leader.markFollowerActive("follower-1");
1780         leader.markFollowerActive("follower-2");
1781         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1782         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1783                 behavior instanceof Leader);
1784
1785         // kill 1 follower and verify if that got killed
1786         final JavaTestKit probe = new JavaTestKit(getSystem());
1787         probe.watch(followerActor1);
1788         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1789         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1790         assertEquals(termMsg1.getActor(), followerActor1);
1791
1792         leader.markFollowerInActive("follower-1");
1793         leader.markFollowerActive("follower-2");
1794         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1795         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1796                 behavior instanceof Leader);
1797
1798         // kill 2nd follower and leader should change to Isolated leader
1799         followerActor2.tell(PoisonPill.getInstance(), null);
1800         probe.watch(followerActor2);
1801         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1802         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1803         assertEquals(termMsg2.getActor(), followerActor2);
1804
1805         leader.markFollowerInActive("follower-2");
1806         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1807     }
1808
1809     @Test
1810     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1811         logStart("testIsolatedLeaderCheckTwoFollowers");
1812
1813         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1814
1815         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1816             behavior instanceof IsolatedLeader);
1817     }
1818
1819     @Test
1820     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1821         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1822
1823         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1824
1825         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1826                 behavior instanceof Leader);
1827     }
1828
1829     @Test
1830     public void testLaggingFollowerStarvation() throws Exception {
1831         logStart("testLaggingFollowerStarvation");
1832         new JavaTestKit(getSystem()) {{
1833             String leaderActorId = actorFactory.generateActorId("leader");
1834             String follower1ActorId = actorFactory.generateActorId("follower");
1835             String follower2ActorId = actorFactory.generateActorId("follower");
1836
1837             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1838                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1839             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1840             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1841
1842             MockRaftActorContext leaderActorContext =
1843                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1844
1845             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1846             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1847             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1848
1849             leaderActorContext.setConfigParams(configParams);
1850
1851             leaderActorContext.setReplicatedLog(
1852                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1853
1854             Map<String, String> peerAddresses = new HashMap<>();
1855             peerAddresses.put(follower1ActorId,
1856                     follower1Actor.path().toString());
1857             peerAddresses.put(follower2ActorId,
1858                     follower2Actor.path().toString());
1859
1860             leaderActorContext.setPeerAddresses(peerAddresses);
1861             leaderActorContext.getTermInformation().update(1, leaderActorId);
1862
1863             RaftActorBehavior leader = createBehavior(leaderActorContext);
1864
1865             leaderActor.underlyingActor().setBehavior(leader);
1866
1867             for(int i=1;i<6;i++) {
1868                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1869                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1870                 assertTrue(newBehavior == leader);
1871                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1872             }
1873
1874             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1875             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1876
1877             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1878                     heartbeats.size() > 1);
1879
1880             // Check if follower-2 got AppendEntries during this time and was not starved
1881             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1882
1883             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1884                     appendEntries.size() > 1);
1885
1886         }};
1887     }
1888
1889     @Test
1890     public void testReplicationConsensusWithNonVotingFollower() {
1891         logStart("testReplicationConsensusWithNonVotingFollower");
1892
1893         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1894         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1895                 new FiniteDuration(1000, TimeUnit.SECONDS));
1896
1897         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1898
1899         String nonVotingFollowerId = "nonvoting-follower";
1900         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
1901                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
1902
1903         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
1904
1905         leader = new Leader(leaderActorContext);
1906
1907         // Ignore initial heartbeats
1908         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1909         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1910
1911         MessageCollectorActor.clearMessages(followerActor);
1912         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1913         MessageCollectorActor.clearMessages(leaderActor);
1914
1915         // Send a Replicate message and wait for AppendEntries.
1916         sendReplicate(leaderActorContext, 0);
1917
1918         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1919         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
1920
1921         // Send reply only from the voting follower and verify consensus via ApplyState.
1922         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1923
1924         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1925
1926         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
1927
1928         MessageCollectorActor.clearMessages(followerActor);
1929         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
1930         MessageCollectorActor.clearMessages(leaderActor);
1931
1932         // Send another Replicate message
1933         sendReplicate(leaderActorContext, 1);
1934
1935         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1936         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
1937                 AppendEntries.class);
1938         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
1939         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
1940
1941         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
1942         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
1943
1944         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
1945
1946         // Send reply from the voting follower and verify consensus.
1947         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1948
1949         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1950     }
1951
1952     @Test
1953     public void testTransferLeadershipWithFollowerInSync() {
1954         logStart("testTransferLeadershipWithFollowerInSync");
1955
1956         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1957         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1958                 new FiniteDuration(1000, TimeUnit.SECONDS));
1959         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1960
1961         leader = new Leader(leaderActorContext);
1962
1963         // Initial heartbeat
1964         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1965         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
1966         MessageCollectorActor.clearMessages(followerActor);
1967
1968         sendReplicate(leaderActorContext, 0);
1969         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1970
1971         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1972         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
1973         MessageCollectorActor.clearMessages(followerActor);
1974
1975         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
1976         doNothing().when(mockTransferCohort).transferComplete();
1977         leader.transferLeadership(mockTransferCohort);
1978
1979         verify(mockTransferCohort, never()).transferComplete();
1980         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1981         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
1982
1983         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
1984         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1985
1986         // Leader should force an election timeout
1987         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
1988
1989         verify(mockTransferCohort).transferComplete();
1990     }
1991
1992     @Test
1993     public void testTransferLeadershipWithEmptyLog() {
1994         logStart("testTransferLeadershipWithEmptyLog");
1995
1996         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1997         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1998                 new FiniteDuration(1000, TimeUnit.SECONDS));
1999         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2000
2001         leader = new Leader(leaderActorContext);
2002
2003         // Initial heartbeat
2004         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2005         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2006         MessageCollectorActor.clearMessages(followerActor);
2007
2008         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2009         doNothing().when(mockTransferCohort).transferComplete();
2010         leader.transferLeadership(mockTransferCohort);
2011
2012         verify(mockTransferCohort, never()).transferComplete();
2013         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2014         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2015
2016         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2017         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2018
2019         // Leader should force an election timeout
2020         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2021
2022         verify(mockTransferCohort).transferComplete();
2023     }
2024
2025     @Test
2026     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2027         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2028
2029         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2030         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2031                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2032
2033         leader = new Leader(leaderActorContext);
2034
2035         // Initial heartbeat
2036         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2037         MessageCollectorActor.clearMessages(followerActor);
2038
2039         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2040         doNothing().when(mockTransferCohort).transferComplete();
2041         leader.transferLeadership(mockTransferCohort);
2042
2043         verify(mockTransferCohort, never()).transferComplete();
2044
2045         // Sync up the follower.
2046         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2047         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2048         MessageCollectorActor.clearMessages(followerActor);
2049
2050         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2051                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2052         leader.handleMessage(leaderActor, new SendHeartBeat());
2053         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2054         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2055
2056         // Leader should force an election timeout
2057         MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
2058
2059         verify(mockTransferCohort).transferComplete();
2060     }
2061
2062     @Test
2063     public void testTransferLeadershipWithFollowerSyncTimeout() {
2064         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2065
2066         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2067         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2068                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2069         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2070         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2071
2072         leader = new Leader(leaderActorContext);
2073
2074         // Initial heartbeat
2075         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2076         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2077         MessageCollectorActor.clearMessages(followerActor);
2078
2079         sendReplicate(leaderActorContext, 0);
2080         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2081
2082         MessageCollectorActor.clearMessages(followerActor);
2083
2084         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2085         doNothing().when(mockTransferCohort).abortTransfer();
2086         leader.transferLeadership(mockTransferCohort);
2087
2088         verify(mockTransferCohort, never()).transferComplete();
2089
2090         // Send heartbeats to time out the transfer.
2091         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2092             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
2093                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2094             leader.handleMessage(leaderActor, new SendHeartBeat());
2095         }
2096
2097         verify(mockTransferCohort).abortTransfer();
2098         verify(mockTransferCohort, never()).transferComplete();
2099         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2100     }
2101
2102     @Override
2103     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
2104             ActorRef actorRef, RaftRPC rpc) throws Exception {
2105         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2106         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2107     }
2108
2109     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2110
2111         private final long electionTimeOutIntervalMillis;
2112         private final int snapshotChunkSize;
2113
2114         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
2115             super();
2116             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2117             this.snapshotChunkSize = snapshotChunkSize;
2118         }
2119
2120         @Override
2121         public FiniteDuration getElectionTimeOutInterval() {
2122             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2123         }
2124
2125         @Override
2126         public int getSnapshotChunkSize() {
2127             return snapshotChunkSize;
2128         }
2129     }
2130 }