BUG 2185 : Disable all internal switching of behavior
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.actor.Terminated;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.google.protobuf.ByteString;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
38 import org.opendaylight.controller.cluster.raft.SerializationUtils;
39 import org.opendaylight.controller.cluster.raft.Snapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
46 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
49 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
50 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
51 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
53 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
54 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
55 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
56 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import scala.concurrent.duration.FiniteDuration;
59
60 public class LeaderTest extends AbstractLeaderTest {
61
62     static final String FOLLOWER_ID = "follower";
63     public static final String LEADER_ID = "leader";
64
65     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
66             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
67
68     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
69             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
70
71     private Leader leader;
72     private final short payloadVersion = 5;
73
74     @Override
75     @After
76     public void tearDown() throws Exception {
77         if(leader != null) {
78             leader.close();
79         }
80
81         super.tearDown();
82     }
83
84     @Test
85     public void testHandleMessageForUnknownMessage() throws Exception {
86         logStart("testHandleMessageForUnknownMessage");
87
88         leader = new Leader(createActorContext());
89
90         // handle message should return the Leader state when it receives an
91         // unknown message
92         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
93         Assert.assertTrue(behavior instanceof Leader);
94     }
95
96     @Test
97     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
98         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
99
100         MockRaftActorContext actorContext = createActorContextWithFollower();
101         short payloadVersion = (short)5;
102         actorContext.setPayloadVersion(payloadVersion);
103
104         long term = 1;
105         actorContext.getTermInformation().update(term, "");
106
107         leader = new Leader(actorContext);
108
109         // Leader should send an immediate heartbeat with no entries as follower is inactive.
110         long lastIndex = actorContext.getReplicatedLog().lastIndex();
111         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
112         assertEquals("getTerm", term, appendEntries.getTerm());
113         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
114         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
115         assertEquals("Entries size", 0, appendEntries.getEntries().size());
116         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
117
118         // The follower would normally reply - simulate that explicitly here.
119         leader.handleMessage(followerActor, new AppendEntriesReply(
120                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
121         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
122
123         followerActor.underlyingActor().clear();
124
125         // Sleep for the heartbeat interval so AppendEntries is sent.
126         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
127                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
128
129         leader.handleMessage(leaderActor, new SendHeartBeat());
130
131         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
132         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
133         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
134         assertEquals("Entries size", 1, appendEntries.getEntries().size());
135         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
136         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
137         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
138     }
139
140
141     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
142         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
143         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
144                 1, index, payload);
145         actorContext.getReplicatedLog().append(newEntry);
146         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
147     }
148
149     @Test
150     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
151         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
152
153         MockRaftActorContext actorContext = createActorContextWithFollower();
154
155         long term = 1;
156         actorContext.getTermInformation().update(term, "");
157
158         leader = new Leader(actorContext);
159
160         // Leader will send an immediate heartbeat - ignore it.
161         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
162
163         // The follower would normally reply - simulate that explicitly here.
164         long lastIndex = actorContext.getReplicatedLog().lastIndex();
165         leader.handleMessage(followerActor, new AppendEntriesReply(
166                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
167         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
168
169         followerActor.underlyingActor().clear();
170
171         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
172
173         // State should not change
174         assertTrue(raftBehavior instanceof Leader);
175
176         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
177         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
178         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
179         assertEquals("Entries size", 1, appendEntries.getEntries().size());
180         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
181         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
182         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
183         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
184     }
185
186     @Test
187     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
188         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
189
190         MockRaftActorContext actorContext = createActorContextWithFollower();
191         actorContext.setRaftPolicy(createRaftPolicy(true, true));
192
193         long term = 1;
194         actorContext.getTermInformation().update(term, "");
195
196         leader = new Leader(actorContext);
197
198         // Leader will send an immediate heartbeat - ignore it.
199         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
200
201         // The follower would normally reply - simulate that explicitly here.
202         long lastIndex = actorContext.getReplicatedLog().lastIndex();
203         leader.handleMessage(followerActor, new AppendEntriesReply(
204                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
205         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
206
207         followerActor.underlyingActor().clear();
208
209         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
210
211         // State should not change
212         assertTrue(raftBehavior instanceof Leader);
213
214         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
215         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
216         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
217         assertEquals("Entries size", 1, appendEntries.getEntries().size());
218         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
219         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
220         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
221         assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
222     }
223
224     @Test
225     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
226         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
227
228         MockRaftActorContext actorContext = createActorContextWithFollower();
229         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
230             @Override
231             public FiniteDuration getHeartBeatInterval() {
232                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
233             }
234         });
235
236         long term = 1;
237         actorContext.getTermInformation().update(term, "");
238
239         leader = new Leader(actorContext);
240
241         // Leader will send an immediate heartbeat - ignore it.
242         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
243
244         // The follower would normally reply - simulate that explicitly here.
245         long lastIndex = actorContext.getReplicatedLog().lastIndex();
246         leader.handleMessage(followerActor, new AppendEntriesReply(
247                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
248         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
249
250         followerActor.underlyingActor().clear();
251
252         for(int i=0;i<5;i++) {
253             sendReplicate(actorContext, lastIndex+i+1);
254         }
255
256         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
257         // We expect only 1 message to be sent because of two reasons,
258         // - an append entries reply was not received
259         // - the heartbeat interval has not expired
260         // In this scenario if multiple messages are sent they would likely be duplicates
261         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
262     }
263
264     @Test
265     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
266         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
267
268         MockRaftActorContext actorContext = createActorContextWithFollower();
269         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
270             @Override
271             public FiniteDuration getHeartBeatInterval() {
272                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
273             }
274         });
275
276         long term = 1;
277         actorContext.getTermInformation().update(term, "");
278
279         leader = new Leader(actorContext);
280
281         // Leader will send an immediate heartbeat - ignore it.
282         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283
284         // The follower would normally reply - simulate that explicitly here.
285         long lastIndex = actorContext.getReplicatedLog().lastIndex();
286         leader.handleMessage(followerActor, new AppendEntriesReply(
287                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
288         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289
290         followerActor.underlyingActor().clear();
291
292         for(int i=0;i<3;i++) {
293             sendReplicate(actorContext, lastIndex+i+1);
294             leader.handleMessage(followerActor, new AppendEntriesReply(
295                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
296
297         }
298
299         for(int i=3;i<5;i++) {
300             sendReplicate(actorContext, lastIndex + i + 1);
301         }
302
303         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
304         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
305         // get sent to the follower - but not the 5th
306         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
307
308         for(int i=0;i<4;i++) {
309             long expected = allMessages.get(i).getEntries().get(0).getIndex();
310             assertEquals(expected, i+2);
311         }
312     }
313
314     @Test
315     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
316         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
317
318         MockRaftActorContext actorContext = createActorContextWithFollower();
319         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
320             @Override
321             public FiniteDuration getHeartBeatInterval() {
322                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
323             }
324         });
325
326         long term = 1;
327         actorContext.getTermInformation().update(term, "");
328
329         leader = new Leader(actorContext);
330
331         // Leader will send an immediate heartbeat - ignore it.
332         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
333
334         // The follower would normally reply - simulate that explicitly here.
335         long lastIndex = actorContext.getReplicatedLog().lastIndex();
336         leader.handleMessage(followerActor, new AppendEntriesReply(
337                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
338         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
339
340         followerActor.underlyingActor().clear();
341
342         sendReplicate(actorContext, lastIndex+1);
343
344         // Wait slightly longer than heartbeat duration
345         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
346
347         leader.handleMessage(leaderActor, new SendHeartBeat());
348
349         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
350         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
351
352         assertEquals(1, allMessages.get(0).getEntries().size());
353         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
354         assertEquals(1, allMessages.get(1).getEntries().size());
355         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
356
357     }
358
359     @Test
360     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
361         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
362
363         MockRaftActorContext actorContext = createActorContextWithFollower();
364         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
365             @Override
366             public FiniteDuration getHeartBeatInterval() {
367                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
368             }
369         });
370
371         long term = 1;
372         actorContext.getTermInformation().update(term, "");
373
374         leader = new Leader(actorContext);
375
376         // Leader will send an immediate heartbeat - ignore it.
377         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
378
379         // The follower would normally reply - simulate that explicitly here.
380         long lastIndex = actorContext.getReplicatedLog().lastIndex();
381         leader.handleMessage(followerActor, new AppendEntriesReply(
382                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
383         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
384
385         followerActor.underlyingActor().clear();
386
387         for(int i=0;i<3;i++) {
388             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
389             leader.handleMessage(leaderActor, new SendHeartBeat());
390         }
391
392         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
393         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
394     }
395
396     @Test
397     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
398         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
399
400         MockRaftActorContext actorContext = createActorContextWithFollower();
401         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
402             @Override
403             public FiniteDuration getHeartBeatInterval() {
404                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
405             }
406         });
407
408         long term = 1;
409         actorContext.getTermInformation().update(term, "");
410
411         leader = new Leader(actorContext);
412
413         // Leader will send an immediate heartbeat - ignore it.
414         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
415
416         // The follower would normally reply - simulate that explicitly here.
417         long lastIndex = actorContext.getReplicatedLog().lastIndex();
418         leader.handleMessage(followerActor, new AppendEntriesReply(
419                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
421
422         followerActor.underlyingActor().clear();
423
424         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
425         leader.handleMessage(leaderActor, new SendHeartBeat());
426         sendReplicate(actorContext, lastIndex+1);
427
428         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
429         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
430
431         assertEquals(0, allMessages.get(0).getEntries().size());
432         assertEquals(1, allMessages.get(1).getEntries().size());
433     }
434
435
436     @Test
437     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
438         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
439
440         MockRaftActorContext actorContext = createActorContext();
441
442         leader = new Leader(actorContext);
443
444         actorContext.setLastApplied(0);
445
446         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
447         long term = actorContext.getTermInformation().getCurrentTerm();
448         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
449                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
450
451         actorContext.getReplicatedLog().append(newEntry);
452
453         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
454                 new Replicate(leaderActor, "state-id", newEntry));
455
456         // State should not change
457         assertTrue(raftBehavior instanceof Leader);
458
459         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
460
461         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
462         // one since lastApplied state is 0.
463         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
464                 leaderActor, ApplyState.class);
465         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
466
467         for(int i = 0; i <= newLogIndex - 1; i++ ) {
468             ApplyState applyState = applyStateList.get(i);
469             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
470             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
471         }
472
473         ApplyState last = applyStateList.get((int) newLogIndex - 1);
474         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
475         assertEquals("getIdentifier", "state-id", last.getIdentifier());
476     }
477
478     @Test
479     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
480         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
481
482         MockRaftActorContext actorContext = createActorContextWithFollower();
483
484         Map<String, String> leadersSnapshot = new HashMap<>();
485         leadersSnapshot.put("1", "A");
486         leadersSnapshot.put("2", "B");
487         leadersSnapshot.put("3", "C");
488
489         //clears leaders log
490         actorContext.getReplicatedLog().removeFrom(0);
491
492         final int commitIndex = 3;
493         final int snapshotIndex = 2;
494         final int newEntryIndex = 4;
495         final int snapshotTerm = 1;
496         final int currentTerm = 2;
497
498         // set the snapshot variables in replicatedlog
499         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
500         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
501         actorContext.setCommitIndex(commitIndex);
502         //set follower timeout to 2 mins, helps during debugging
503         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
504
505         leader = new Leader(actorContext);
506
507         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
508         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
509
510         // new entry
511         ReplicatedLogImplEntry entry =
512                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
513                         new MockRaftActorContext.MockPayload("D"));
514
515         //update follower timestamp
516         leader.markFollowerActive(FOLLOWER_ID);
517
518         ByteString bs = toByteString(leadersSnapshot);
519         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
520                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
521         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
522         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
523
524         //send first chunk and no InstallSnapshotReply received yet
525         fts.getNextChunk();
526         fts.incrementChunkIndex();
527
528         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
529                 TimeUnit.MILLISECONDS);
530
531         leader.handleMessage(leaderActor, new SendHeartBeat());
532
533         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
534
535         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
536
537         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
538
539         //InstallSnapshotReply received
540         fts.markSendStatus(true);
541
542         leader.handleMessage(leaderActor, new SendHeartBeat());
543
544         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
545
546         assertEquals(commitIndex, is.getLastIncludedIndex());
547     }
548
549     @Test
550     public void testSendAppendEntriesSnapshotScenario() throws Exception {
551         logStart("testSendAppendEntriesSnapshotScenario");
552
553         MockRaftActorContext actorContext = createActorContextWithFollower();
554
555         Map<String, String> leadersSnapshot = new HashMap<>();
556         leadersSnapshot.put("1", "A");
557         leadersSnapshot.put("2", "B");
558         leadersSnapshot.put("3", "C");
559
560         //clears leaders log
561         actorContext.getReplicatedLog().removeFrom(0);
562
563         final int followersLastIndex = 2;
564         final int snapshotIndex = 3;
565         final int newEntryIndex = 4;
566         final int snapshotTerm = 1;
567         final int currentTerm = 2;
568
569         // set the snapshot variables in replicatedlog
570         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
571         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
572         actorContext.setCommitIndex(followersLastIndex);
573
574         leader = new Leader(actorContext);
575
576         // Leader will send an immediate heartbeat - ignore it.
577         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
578
579         // new entry
580         ReplicatedLogImplEntry entry =
581                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
582                         new MockRaftActorContext.MockPayload("D"));
583
584         actorContext.getReplicatedLog().append(entry);
585
586         //update follower timestamp
587         leader.markFollowerActive(FOLLOWER_ID);
588
589         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
590         RaftActorBehavior raftBehavior = leader.handleMessage(
591                 leaderActor, new Replicate(null, "state-id", entry));
592
593         assertTrue(raftBehavior instanceof Leader);
594
595         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
596     }
597
598     @Test
599     public void testInitiateInstallSnapshot() throws Exception {
600         logStart("testInitiateInstallSnapshot");
601
602         MockRaftActorContext actorContext = createActorContextWithFollower();
603
604         Map<String, String> leadersSnapshot = new HashMap<>();
605         leadersSnapshot.put("1", "A");
606         leadersSnapshot.put("2", "B");
607         leadersSnapshot.put("3", "C");
608
609         //clears leaders log
610         actorContext.getReplicatedLog().removeFrom(0);
611
612         final int followersLastIndex = 2;
613         final int snapshotIndex = 3;
614         final int newEntryIndex = 4;
615         final int snapshotTerm = 1;
616         final int currentTerm = 2;
617
618         // set the snapshot variables in replicatedlog
619         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
620         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
621         actorContext.setLastApplied(3);
622         actorContext.setCommitIndex(followersLastIndex);
623
624         leader = new Leader(actorContext);
625
626         // Leader will send an immediate heartbeat - ignore it.
627         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
628
629         // set the snapshot as absent and check if capture-snapshot is invoked.
630         leader.setSnapshot(null);
631
632         // new entry
633         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
634                 new MockRaftActorContext.MockPayload("D"));
635
636         actorContext.getReplicatedLog().append(entry);
637
638         //update follower timestamp
639         leader.markFollowerActive(FOLLOWER_ID);
640
641         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
642
643         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
644
645         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
646
647         assertTrue(cs.isInstallSnapshotInitiated());
648         assertEquals(3, cs.getLastAppliedIndex());
649         assertEquals(1, cs.getLastAppliedTerm());
650         assertEquals(4, cs.getLastIndex());
651         assertEquals(2, cs.getLastTerm());
652
653         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
654         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
655
656         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
657     }
658
659     @Test
660     public void testInstallSnapshot() throws Exception {
661         logStart("testInstallSnapshot");
662
663         MockRaftActorContext actorContext = createActorContextWithFollower();
664
665         Map<String, String> leadersSnapshot = new HashMap<>();
666         leadersSnapshot.put("1", "A");
667         leadersSnapshot.put("2", "B");
668         leadersSnapshot.put("3", "C");
669
670         //clears leaders log
671         actorContext.getReplicatedLog().removeFrom(0);
672
673         final int lastAppliedIndex = 3;
674         final int snapshotIndex = 2;
675         final int snapshotTerm = 1;
676         final int currentTerm = 2;
677
678         // set the snapshot variables in replicatedlog
679         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
680         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
681         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
682         actorContext.setCommitIndex(lastAppliedIndex);
683         actorContext.setLastApplied(lastAppliedIndex);
684
685         leader = new Leader(actorContext);
686
687         // Initial heartbeat.
688         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
689
690         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
691         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
692
693         Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
694                 Collections.<ReplicatedLogEntry>emptyList(),
695                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
696
697         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
698
699         assertTrue(raftBehavior instanceof Leader);
700
701         // check if installsnapshot gets called with the correct values.
702
703         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
704
705         assertNotNull(installSnapshot.getData());
706         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
707         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
708
709         assertEquals(currentTerm, installSnapshot.getTerm());
710     }
711
712     @Test
713     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
714         logStart("testHandleInstallSnapshotReplyLastChunk");
715
716         MockRaftActorContext actorContext = createActorContextWithFollower();
717
718         final int commitIndex = 3;
719         final int snapshotIndex = 2;
720         final int snapshotTerm = 1;
721         final int currentTerm = 2;
722
723         actorContext.setCommitIndex(commitIndex);
724
725         leader = new Leader(actorContext);
726
727         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
728         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
729
730         // Ignore initial heartbeat.
731         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
732
733         Map<String, String> leadersSnapshot = new HashMap<>();
734         leadersSnapshot.put("1", "A");
735         leadersSnapshot.put("2", "B");
736         leadersSnapshot.put("3", "C");
737
738         // set the snapshot variables in replicatedlog
739
740         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
741         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
742         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
743
744         ByteString bs = toByteString(leadersSnapshot);
745         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
746                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
747         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
748         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
749         while(!fts.isLastChunk(fts.getChunkIndex())) {
750             fts.getNextChunk();
751             fts.incrementChunkIndex();
752         }
753
754         //clears leaders log
755         actorContext.getReplicatedLog().removeFrom(0);
756
757         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
758                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
759
760         assertTrue(raftBehavior instanceof Leader);
761
762         assertEquals(0, leader.followerSnapshotSize());
763         assertEquals(1, leader.followerLogSize());
764         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
765         assertNotNull(fli);
766         assertEquals(commitIndex, fli.getMatchIndex());
767         assertEquals(commitIndex + 1, fli.getNextIndex());
768     }
769
770     @Test
771     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
772         logStart("testSendSnapshotfromInstallSnapshotReply");
773
774         MockRaftActorContext actorContext = createActorContextWithFollower();
775
776         final int commitIndex = 3;
777         final int snapshotIndex = 2;
778         final int snapshotTerm = 1;
779         final int currentTerm = 2;
780
781         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
782             @Override
783             public int getSnapshotChunkSize() {
784                 return 50;
785             }
786         };
787         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
788         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
789
790         actorContext.setConfigParams(configParams);
791         actorContext.setCommitIndex(commitIndex);
792
793         leader = new Leader(actorContext);
794
795         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
796         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
797
798         Map<String, String> leadersSnapshot = new HashMap<>();
799         leadersSnapshot.put("1", "A");
800         leadersSnapshot.put("2", "B");
801         leadersSnapshot.put("3", "C");
802
803         // set the snapshot variables in replicatedlog
804         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
805         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
806         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
807
808         ByteString bs = toByteString(leadersSnapshot);
809         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
810                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
811         leader.setSnapshot(snapshot);
812
813         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
814
815         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
816
817         assertEquals(1, installSnapshot.getChunkIndex());
818         assertEquals(3, installSnapshot.getTotalChunks());
819
820         followerActor.underlyingActor().clear();
821         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
822                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
823
824         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
825
826         assertEquals(2, installSnapshot.getChunkIndex());
827         assertEquals(3, installSnapshot.getTotalChunks());
828
829         followerActor.underlyingActor().clear();
830         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
831                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
832
833         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
834
835         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
836         followerActor.underlyingActor().clear();
837         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
838                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
839
840         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
841
842         Assert.assertNull(installSnapshot);
843     }
844
845
846     @Test
847     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
848         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
849
850         MockRaftActorContext actorContext = createActorContextWithFollower();
851
852         final int commitIndex = 3;
853         final int snapshotIndex = 2;
854         final int snapshotTerm = 1;
855         final int currentTerm = 2;
856
857         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
858             @Override
859             public int getSnapshotChunkSize() {
860                 return 50;
861             }
862         });
863
864         actorContext.setCommitIndex(commitIndex);
865
866         leader = new Leader(actorContext);
867
868         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
869         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
870
871         Map<String, String> leadersSnapshot = new HashMap<>();
872         leadersSnapshot.put("1", "A");
873         leadersSnapshot.put("2", "B");
874         leadersSnapshot.put("3", "C");
875
876         // set the snapshot variables in replicatedlog
877         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
878         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
879         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
880
881         ByteString bs = toByteString(leadersSnapshot);
882         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
883                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
884         leader.setSnapshot(snapshot);
885
886         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
887         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
888
889         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
890
891         assertEquals(1, installSnapshot.getChunkIndex());
892         assertEquals(3, installSnapshot.getTotalChunks());
893
894         followerActor.underlyingActor().clear();
895
896         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
897                 FOLLOWER_ID, -1, false));
898
899         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
900                 TimeUnit.MILLISECONDS);
901
902         leader.handleMessage(leaderActor, new SendHeartBeat());
903
904         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
905
906         assertEquals(1, installSnapshot.getChunkIndex());
907         assertEquals(3, installSnapshot.getTotalChunks());
908     }
909
910     @Test
911     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
912         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
913
914         MockRaftActorContext actorContext = createActorContextWithFollower();
915
916         final int commitIndex = 3;
917         final int snapshotIndex = 2;
918         final int snapshotTerm = 1;
919         final int currentTerm = 2;
920
921         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
922             @Override
923             public int getSnapshotChunkSize() {
924                 return 50;
925             }
926         });
927
928         actorContext.setCommitIndex(commitIndex);
929
930         leader = new Leader(actorContext);
931
932         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
933         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
934
935         Map<String, String> leadersSnapshot = new HashMap<>();
936         leadersSnapshot.put("1", "A");
937         leadersSnapshot.put("2", "B");
938         leadersSnapshot.put("3", "C");
939
940         // set the snapshot variables in replicatedlog
941         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
942         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
943         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
944
945         ByteString bs = toByteString(leadersSnapshot);
946         Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
947                 commitIndex, snapshotTerm, commitIndex, snapshotTerm);
948         leader.setSnapshot(snapshot);
949
950         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
951
952         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
953
954         assertEquals(1, installSnapshot.getChunkIndex());
955         assertEquals(3, installSnapshot.getTotalChunks());
956         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
957
958         int hashCode = installSnapshot.getData().hashCode();
959
960         followerActor.underlyingActor().clear();
961
962         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
963                 FOLLOWER_ID, 1, true));
964
965         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
966
967         assertEquals(2, installSnapshot.getChunkIndex());
968         assertEquals(3, installSnapshot.getTotalChunks());
969         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
970     }
971
972     @Test
973     public void testFollowerToSnapshotLogic() {
974         logStart("testFollowerToSnapshotLogic");
975
976         MockRaftActorContext actorContext = createActorContext();
977
978         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
979             @Override
980             public int getSnapshotChunkSize() {
981                 return 50;
982             }
983         });
984
985         leader = new Leader(actorContext);
986
987         Map<String, String> leadersSnapshot = new HashMap<>();
988         leadersSnapshot.put("1", "A");
989         leadersSnapshot.put("2", "B");
990         leadersSnapshot.put("3", "C");
991
992         ByteString bs = toByteString(leadersSnapshot);
993         byte[] barray = bs.toByteArray();
994
995         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
996         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
997
998         assertEquals(bs.size(), barray.length);
999
1000         int chunkIndex=0;
1001         for (int i=0; i < barray.length; i = i + 50) {
1002             int j = i + 50;
1003             chunkIndex++;
1004
1005             if (i + 50 > barray.length) {
1006                 j = barray.length;
1007             }
1008
1009             ByteString chunk = fts.getNextChunk();
1010             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
1011             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1012
1013             fts.markSendStatus(true);
1014             if (!fts.isLastChunk(chunkIndex)) {
1015                 fts.incrementChunkIndex();
1016             }
1017         }
1018
1019         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1020     }
1021
1022     @Override protected RaftActorBehavior createBehavior(
1023         RaftActorContext actorContext) {
1024         return new Leader(actorContext);
1025     }
1026
1027     @Override
1028     protected MockRaftActorContext createActorContext() {
1029         return createActorContext(leaderActor);
1030     }
1031
1032     @Override
1033     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
1034         return createActorContext(LEADER_ID, actorRef);
1035     }
1036
1037     private MockRaftActorContext createActorContextWithFollower() {
1038         MockRaftActorContext actorContext = createActorContext();
1039         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1040                 followerActor.path().toString()).build());
1041         return actorContext;
1042     }
1043
1044     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
1045         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1046         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1047         configParams.setElectionTimeoutFactor(100000);
1048         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1049         context.setConfigParams(configParams);
1050         context.setPayloadVersion(payloadVersion);
1051         return context;
1052     }
1053
1054     private MockRaftActorContext createFollowerActorContextWithLeader() {
1055         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1056         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1057         followerConfig.setElectionTimeoutFactor(10000);
1058         followerActorContext.setConfigParams(followerConfig);
1059         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1060         return followerActorContext;
1061     }
1062
1063     @Test
1064     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
1065         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1066
1067         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1068
1069         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1070
1071         Follower follower = new Follower(followerActorContext);
1072         followerActor.underlyingActor().setBehavior(follower);
1073
1074         Map<String, String> peerAddresses = new HashMap<>();
1075         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1076
1077         leaderActorContext.setPeerAddresses(peerAddresses);
1078
1079         leaderActorContext.getReplicatedLog().removeFrom(0);
1080
1081         //create 3 entries
1082         leaderActorContext.setReplicatedLog(
1083                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1084
1085         leaderActorContext.setCommitIndex(1);
1086
1087         followerActorContext.getReplicatedLog().removeFrom(0);
1088
1089         // follower too has the exact same log entries and has the same commit index
1090         followerActorContext.setReplicatedLog(
1091                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1092
1093         followerActorContext.setCommitIndex(1);
1094
1095         leader = new Leader(leaderActorContext);
1096
1097         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1098
1099         assertEquals(1, appendEntries.getLeaderCommit());
1100         assertEquals(0, appendEntries.getEntries().size());
1101         assertEquals(0, appendEntries.getPrevLogIndex());
1102
1103         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1104                 leaderActor, AppendEntriesReply.class);
1105
1106         assertEquals(2, appendEntriesReply.getLogLastIndex());
1107         assertEquals(1, appendEntriesReply.getLogLastTerm());
1108
1109         // follower returns its next index
1110         assertEquals(2, appendEntriesReply.getLogLastIndex());
1111         assertEquals(1, appendEntriesReply.getLogLastTerm());
1112
1113         follower.close();
1114     }
1115
1116     @Test
1117     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1118         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1119
1120         MockRaftActorContext leaderActorContext = createActorContext();
1121
1122         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1123         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1124
1125         Follower follower = new Follower(followerActorContext);
1126         followerActor.underlyingActor().setBehavior(follower);
1127
1128         Map<String, String> leaderPeerAddresses = new HashMap<>();
1129         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1130
1131         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1132
1133         leaderActorContext.getReplicatedLog().removeFrom(0);
1134
1135         leaderActorContext.setReplicatedLog(
1136                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1137
1138         leaderActorContext.setCommitIndex(1);
1139
1140         followerActorContext.getReplicatedLog().removeFrom(0);
1141
1142         followerActorContext.setReplicatedLog(
1143                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1144
1145         // follower has the same log entries but its commit index > leaders commit index
1146         followerActorContext.setCommitIndex(2);
1147
1148         leader = new Leader(leaderActorContext);
1149
1150         // Initial heartbeat
1151         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1152
1153         assertEquals(1, appendEntries.getLeaderCommit());
1154         assertEquals(0, appendEntries.getEntries().size());
1155         assertEquals(0, appendEntries.getPrevLogIndex());
1156
1157         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1158                 leaderActor, AppendEntriesReply.class);
1159
1160         assertEquals(2, appendEntriesReply.getLogLastIndex());
1161         assertEquals(1, appendEntriesReply.getLogLastTerm());
1162
1163         leaderActor.underlyingActor().setBehavior(follower);
1164         leader.handleMessage(followerActor, appendEntriesReply);
1165
1166         leaderActor.underlyingActor().clear();
1167         followerActor.underlyingActor().clear();
1168
1169         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1170                 TimeUnit.MILLISECONDS);
1171
1172         leader.handleMessage(leaderActor, new SendHeartBeat());
1173
1174         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1175
1176         assertEquals(2, appendEntries.getLeaderCommit());
1177         assertEquals(0, appendEntries.getEntries().size());
1178         assertEquals(2, appendEntries.getPrevLogIndex());
1179
1180         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1181
1182         assertEquals(2, appendEntriesReply.getLogLastIndex());
1183         assertEquals(1, appendEntriesReply.getLogLastTerm());
1184
1185         assertEquals(2, followerActorContext.getCommitIndex());
1186
1187         follower.close();
1188     }
1189
1190     @Test
1191     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1192         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1193
1194         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1195         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1196                 new FiniteDuration(1000, TimeUnit.SECONDS));
1197
1198         leaderActorContext.setReplicatedLog(
1199                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1200         long leaderCommitIndex = 2;
1201         leaderActorContext.setCommitIndex(leaderCommitIndex);
1202         leaderActorContext.setLastApplied(leaderCommitIndex);
1203
1204         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1205         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1206
1207         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1208
1209         followerActorContext.setReplicatedLog(
1210                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1211         followerActorContext.setCommitIndex(0);
1212         followerActorContext.setLastApplied(0);
1213
1214         Follower follower = new Follower(followerActorContext);
1215         followerActor.underlyingActor().setBehavior(follower);
1216
1217         leader = new Leader(leaderActorContext);
1218
1219         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1220         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1221
1222         MessageCollectorActor.clearMessages(followerActor);
1223         MessageCollectorActor.clearMessages(leaderActor);
1224
1225         // Verify initial AppendEntries sent with the leader's current commit index.
1226         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1227         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1228         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1229
1230         leaderActor.underlyingActor().setBehavior(leader);
1231
1232         leader.handleMessage(followerActor, appendEntriesReply);
1233
1234         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1235         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1236
1237         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1238         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1239         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1240
1241         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1242         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1243                 appendEntries.getEntries().get(0).getData());
1244         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1245         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1246                 appendEntries.getEntries().get(1).getData());
1247
1248         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1249         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1250
1251         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1252
1253         ApplyState applyState = applyStateList.get(0);
1254         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1255         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1256         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1257                 applyState.getReplicatedLogEntry().getData());
1258
1259         applyState = applyStateList.get(1);
1260         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1261         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1262         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1263                 applyState.getReplicatedLogEntry().getData());
1264
1265         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1266         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1267     }
1268
1269     @Test
1270     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1271         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1272
1273         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1274         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1275                 new FiniteDuration(1000, TimeUnit.SECONDS));
1276
1277         leaderActorContext.setReplicatedLog(
1278                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1279         long leaderCommitIndex = 1;
1280         leaderActorContext.setCommitIndex(leaderCommitIndex);
1281         leaderActorContext.setLastApplied(leaderCommitIndex);
1282
1283         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1284         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1285
1286         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1287
1288         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1289         followerActorContext.setCommitIndex(-1);
1290         followerActorContext.setLastApplied(-1);
1291
1292         Follower follower = new Follower(followerActorContext);
1293         followerActor.underlyingActor().setBehavior(follower);
1294
1295         leader = new Leader(leaderActorContext);
1296
1297         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1298         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1299
1300         MessageCollectorActor.clearMessages(followerActor);
1301         MessageCollectorActor.clearMessages(leaderActor);
1302
1303         // Verify initial AppendEntries sent with the leader's current commit index.
1304         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1305         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1306         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1307
1308         leaderActor.underlyingActor().setBehavior(leader);
1309
1310         leader.handleMessage(followerActor, appendEntriesReply);
1311
1312         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1313         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1314
1315         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1316         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1317         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1318
1319         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1320         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1321                 appendEntries.getEntries().get(0).getData());
1322         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1323         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1324                 appendEntries.getEntries().get(1).getData());
1325
1326         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1327         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1328
1329         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1330
1331         ApplyState applyState = applyStateList.get(0);
1332         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1333         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1334         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1335                 applyState.getReplicatedLogEntry().getData());
1336
1337         applyState = applyStateList.get(1);
1338         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1339         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1340         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1341                 applyState.getReplicatedLogEntry().getData());
1342
1343         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1344         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1345     }
1346
1347     @Test
1348     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1349         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1350
1351         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1352         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1353                 new FiniteDuration(1000, TimeUnit.SECONDS));
1354
1355         leaderActorContext.setReplicatedLog(
1356                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1357         long leaderCommitIndex = 1;
1358         leaderActorContext.setCommitIndex(leaderCommitIndex);
1359         leaderActorContext.setLastApplied(leaderCommitIndex);
1360
1361         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1362         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1363
1364         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1365
1366         followerActorContext.setReplicatedLog(
1367                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1368         followerActorContext.setCommitIndex(-1);
1369         followerActorContext.setLastApplied(-1);
1370
1371         Follower follower = new Follower(followerActorContext);
1372         followerActor.underlyingActor().setBehavior(follower);
1373
1374         leader = new Leader(leaderActorContext);
1375
1376         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1377         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1378
1379         MessageCollectorActor.clearMessages(followerActor);
1380         MessageCollectorActor.clearMessages(leaderActor);
1381
1382         // Verify initial AppendEntries sent with the leader's current commit index.
1383         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1384         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1385         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1386
1387         leaderActor.underlyingActor().setBehavior(leader);
1388
1389         leader.handleMessage(followerActor, appendEntriesReply);
1390
1391         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1392         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1393
1394         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1395         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1396         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1397
1398         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1399         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1400         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1401                 appendEntries.getEntries().get(0).getData());
1402         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1403         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1404         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1405                 appendEntries.getEntries().get(1).getData());
1406
1407         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1408         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1409
1410         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1411
1412         ApplyState applyState = applyStateList.get(0);
1413         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1414         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1415         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1416                 applyState.getReplicatedLogEntry().getData());
1417
1418         applyState = applyStateList.get(1);
1419         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1420         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1421         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1422                 applyState.getReplicatedLogEntry().getData());
1423
1424         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1425         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1426         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1427     }
1428
1429     @Test
1430     public void testHandleAppendEntriesReplyWithNewerTerm(){
1431         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1432
1433         MockRaftActorContext leaderActorContext = createActorContext();
1434         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1435                 new FiniteDuration(10000, TimeUnit.SECONDS));
1436
1437         leaderActorContext.setReplicatedLog(
1438                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1439
1440         leader = new Leader(leaderActorContext);
1441         leaderActor.underlyingActor().setBehavior(leader);
1442         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1443
1444         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1445
1446         assertEquals(false, appendEntriesReply.isSuccess());
1447         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1448
1449         MessageCollectorActor.clearMessages(leaderActor);
1450     }
1451
1452     @Test
1453     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
1454         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1455
1456         MockRaftActorContext leaderActorContext = createActorContext();
1457         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1458                 new FiniteDuration(10000, TimeUnit.SECONDS));
1459
1460         leaderActorContext.setReplicatedLog(
1461                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1462         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1463
1464         leader = new Leader(leaderActorContext);
1465         leaderActor.underlyingActor().setBehavior(leader);
1466         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1467
1468         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1469
1470         assertEquals(false, appendEntriesReply.isSuccess());
1471         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1472
1473         MessageCollectorActor.clearMessages(leaderActor);
1474     }
1475
1476     @Test
1477     public void testHandleAppendEntriesReplySuccess() throws Exception {
1478         logStart("testHandleAppendEntriesReplySuccess");
1479
1480         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1481
1482         leaderActorContext.setReplicatedLog(
1483                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1484
1485         leaderActorContext.setCommitIndex(1);
1486         leaderActorContext.setLastApplied(1);
1487         leaderActorContext.getTermInformation().update(1, "leader");
1488
1489         leader = new Leader(leaderActorContext);
1490
1491         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1492
1493         short payloadVersion = 5;
1494         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1495
1496         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1497
1498         assertEquals(RaftState.Leader, raftActorBehavior.state());
1499
1500         assertEquals(2, leaderActorContext.getCommitIndex());
1501
1502         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1503                 leaderActor, ApplyJournalEntries.class);
1504
1505         assertEquals(2, leaderActorContext.getLastApplied());
1506
1507         assertEquals(2, applyJournalEntries.getToIndex());
1508
1509         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1510                 ApplyState.class);
1511
1512         assertEquals(1,applyStateList.size());
1513
1514         ApplyState applyState = applyStateList.get(0);
1515
1516         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1517
1518         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1519         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1520     }
1521
1522     @Test
1523     public void testHandleAppendEntriesReplyUnknownFollower(){
1524         logStart("testHandleAppendEntriesReplyUnknownFollower");
1525
1526         MockRaftActorContext leaderActorContext = createActorContext();
1527
1528         leader = new Leader(leaderActorContext);
1529
1530         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1531
1532         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1533
1534         assertEquals(RaftState.Leader, raftActorBehavior.state());
1535     }
1536
1537     @Test
1538     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1539         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1540
1541         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1542         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1543                 new FiniteDuration(1000, TimeUnit.SECONDS));
1544         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1545
1546         leaderActorContext.setReplicatedLog(
1547                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1548         long leaderCommitIndex = 3;
1549         leaderActorContext.setCommitIndex(leaderCommitIndex);
1550         leaderActorContext.setLastApplied(leaderCommitIndex);
1551
1552         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1553         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1554         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1555         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1556
1557         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1558
1559         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1560         followerActorContext.setCommitIndex(-1);
1561         followerActorContext.setLastApplied(-1);
1562
1563         Follower follower = new Follower(followerActorContext);
1564         followerActor.underlyingActor().setBehavior(follower);
1565
1566         leader = new Leader(leaderActorContext);
1567
1568         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1569         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1570
1571         MessageCollectorActor.clearMessages(followerActor);
1572         MessageCollectorActor.clearMessages(leaderActor);
1573
1574         // Verify initial AppendEntries sent with the leader's current commit index.
1575         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1576         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1577         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1578
1579         leaderActor.underlyingActor().setBehavior(leader);
1580
1581         leader.handleMessage(followerActor, appendEntriesReply);
1582
1583         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1584         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1585
1586         appendEntries = appendEntriesList.get(0);
1587         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1588         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1589         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1590
1591         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1592         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1593                 appendEntries.getEntries().get(0).getData());
1594         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1595         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1596                 appendEntries.getEntries().get(1).getData());
1597
1598         appendEntries = appendEntriesList.get(1);
1599         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1600         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1601         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1602
1603         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1604         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1605                 appendEntries.getEntries().get(0).getData());
1606         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1607         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1608                 appendEntries.getEntries().get(1).getData());
1609
1610         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1611         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1612
1613         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1614
1615         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1616         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1617     }
1618
1619     @Test
1620     public void testHandleRequestVoteReply(){
1621         logStart("testHandleRequestVoteReply");
1622
1623         MockRaftActorContext leaderActorContext = createActorContext();
1624
1625         leader = new Leader(leaderActorContext);
1626
1627         // Should be a no-op.
1628         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1629                 new RequestVoteReply(1, true));
1630
1631         assertEquals(RaftState.Leader, raftActorBehavior.state());
1632
1633         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1634
1635         assertEquals(RaftState.Leader, raftActorBehavior.state());
1636     }
1637
1638     @Test
1639     public void testIsolatedLeaderCheckNoFollowers() {
1640         logStart("testIsolatedLeaderCheckNoFollowers");
1641
1642         MockRaftActorContext leaderActorContext = createActorContext();
1643
1644         leader = new Leader(leaderActorContext);
1645         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1646         Assert.assertTrue(behavior instanceof Leader);
1647     }
1648
1649     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
1650         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1651         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1652
1653         MockRaftActorContext leaderActorContext = createActorContext();
1654
1655         Map<String, String> peerAddresses = new HashMap<>();
1656         peerAddresses.put("follower-1", followerActor1.path().toString());
1657         peerAddresses.put("follower-2", followerActor2.path().toString());
1658
1659         leaderActorContext.setPeerAddresses(peerAddresses);
1660         leaderActorContext.setRaftPolicy(raftPolicy);
1661
1662         leader = new Leader(leaderActorContext);
1663
1664         leader.markFollowerActive("follower-1");
1665         leader.markFollowerActive("follower-2");
1666         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1667         Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1668                 behavior instanceof Leader);
1669
1670         // kill 1 follower and verify if that got killed
1671         final JavaTestKit probe = new JavaTestKit(getSystem());
1672         probe.watch(followerActor1);
1673         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1674         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1675         assertEquals(termMsg1.getActor(), followerActor1);
1676
1677         leader.markFollowerInActive("follower-1");
1678         leader.markFollowerActive("follower-2");
1679         behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1680         Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1681                 behavior instanceof Leader);
1682
1683         // kill 2nd follower and leader should change to Isolated leader
1684         followerActor2.tell(PoisonPill.getInstance(), null);
1685         probe.watch(followerActor2);
1686         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1687         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1688         assertEquals(termMsg2.getActor(), followerActor2);
1689
1690         leader.markFollowerInActive("follower-2");
1691         return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1692     }
1693
1694     @Test
1695     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1696         logStart("testIsolatedLeaderCheckTwoFollowers");
1697
1698         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1699
1700         Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1701             behavior instanceof IsolatedLeader);
1702     }
1703
1704     @Test
1705     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
1706         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1707
1708         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1709
1710         Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1711                 behavior instanceof Leader);
1712     }
1713
1714     @Test
1715     public void testLaggingFollowerStarvation() throws Exception {
1716         logStart("testLaggingFollowerStarvation");
1717         new JavaTestKit(getSystem()) {{
1718             String leaderActorId = actorFactory.generateActorId("leader");
1719             String follower1ActorId = actorFactory.generateActorId("follower");
1720             String follower2ActorId = actorFactory.generateActorId("follower");
1721
1722             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1723                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1724             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1725             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1726
1727             MockRaftActorContext leaderActorContext =
1728                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1729
1730             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1731             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1732             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1733
1734             leaderActorContext.setConfigParams(configParams);
1735
1736             leaderActorContext.setReplicatedLog(
1737                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1738
1739             Map<String, String> peerAddresses = new HashMap<>();
1740             peerAddresses.put(follower1ActorId,
1741                     follower1Actor.path().toString());
1742             peerAddresses.put(follower2ActorId,
1743                     follower2Actor.path().toString());
1744
1745             leaderActorContext.setPeerAddresses(peerAddresses);
1746             leaderActorContext.getTermInformation().update(1, leaderActorId);
1747
1748             RaftActorBehavior leader = createBehavior(leaderActorContext);
1749
1750             leaderActor.underlyingActor().setBehavior(leader);
1751
1752             for(int i=1;i<6;i++) {
1753                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1754                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1755                 assertTrue(newBehavior == leader);
1756                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1757             }
1758
1759             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1760             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1761
1762             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1763                     heartbeats.size() > 1);
1764
1765             // Check if follower-2 got AppendEntries during this time and was not starved
1766             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1767
1768             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1769                     appendEntries.size() > 1);
1770
1771         }};
1772     }
1773
1774     @Override
1775     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1776             ActorRef actorRef, RaftRPC rpc) throws Exception {
1777         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1778         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1779     }
1780
1781     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1782
1783         private final long electionTimeOutIntervalMillis;
1784         private final int snapshotChunkSize;
1785
1786         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1787             super();
1788             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1789             this.snapshotChunkSize = snapshotChunkSize;
1790         }
1791
1792         @Override
1793         public FiniteDuration getElectionTimeOutInterval() {
1794             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1795         }
1796
1797         @Override
1798         public int getSnapshotChunkSize() {
1799             return snapshotChunkSize;
1800         }
1801     }
1802 }