Merge "BUG 2743 - Added support for runtime RPC's to netconf mdsal northbound."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51     public static final String LEADER_ID = "leader";
52
53     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55
56     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58
59     private Leader leader;
60
61     @Override
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         super.tearDown();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123
124     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
125         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
126         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
127                 1, index, payload);
128         actorContext.getReplicatedLog().append(newEntry);
129         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
130     }
131
132     @Test
133     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
134         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
135
136         MockRaftActorContext actorContext = createActorContextWithFollower();
137
138         long term = 1;
139         actorContext.getTermInformation().update(term, "");
140
141         leader = new Leader(actorContext);
142
143         // Leader will send an immediate heartbeat - ignore it.
144         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145
146         // The follower would normally reply - simulate that explicitly here.
147         long lastIndex = actorContext.getReplicatedLog().lastIndex();
148         leader.handleMessage(followerActor, new AppendEntriesReply(
149                 FOLLOWER_ID, term, true, lastIndex, term));
150         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
151
152         followerActor.underlyingActor().clear();
153
154         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156                 1, lastIndex + 1, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
159
160         // State should not change
161         assertTrue(raftBehavior instanceof Leader);
162
163         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166         assertEquals("Entries size", 1, appendEntries.getEntries().size());
167         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
170     }
171
172     @Test
173     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
174         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
175
176         MockRaftActorContext actorContext = createActorContextWithFollower();
177         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
178             @Override
179             public FiniteDuration getHeartBeatInterval() {
180                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
181             }
182         });
183
184         long term = 1;
185         actorContext.getTermInformation().update(term, "");
186
187         leader = new Leader(actorContext);
188
189         // Leader will send an immediate heartbeat - ignore it.
190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191
192         // The follower would normally reply - simulate that explicitly here.
193         long lastIndex = actorContext.getReplicatedLog().lastIndex();
194         leader.handleMessage(followerActor, new AppendEntriesReply(
195                 FOLLOWER_ID, term, true, lastIndex, term));
196         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197
198         followerActor.underlyingActor().clear();
199
200         for(int i=0;i<5;i++) {
201             sendReplicate(actorContext, lastIndex+i+1);
202         }
203
204         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
205         // We expect only 1 message to be sent because of two reasons,
206         // - an append entries reply was not received
207         // - the heartbeat interval has not expired
208         // In this scenario if multiple messages are sent they would likely be duplicates
209         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
210     }
211
212     @Test
213     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
214         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
215
216         MockRaftActorContext actorContext = createActorContextWithFollower();
217         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
218             @Override
219             public FiniteDuration getHeartBeatInterval() {
220                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
221             }
222         });
223
224         long term = 1;
225         actorContext.getTermInformation().update(term, "");
226
227         leader = new Leader(actorContext);
228
229         // Leader will send an immediate heartbeat - ignore it.
230         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
231
232         // The follower would normally reply - simulate that explicitly here.
233         long lastIndex = actorContext.getReplicatedLog().lastIndex();
234         leader.handleMessage(followerActor, new AppendEntriesReply(
235                 FOLLOWER_ID, term, true, lastIndex, term));
236         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
237
238         followerActor.underlyingActor().clear();
239
240         for(int i=0;i<3;i++) {
241             sendReplicate(actorContext, lastIndex+i+1);
242             leader.handleMessage(followerActor, new AppendEntriesReply(
243                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
244
245         }
246
247         for(int i=3;i<5;i++) {
248             sendReplicate(actorContext, lastIndex + i + 1);
249         }
250
251         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
252         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
253         // get sent to the follower - but not the 5th
254         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
255
256         for(int i=0;i<4;i++) {
257             long expected = allMessages.get(i).getEntries().get(0).getIndex();
258             assertEquals(expected, i+2);
259         }
260     }
261
262     @Test
263     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
264         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
265
266         MockRaftActorContext actorContext = createActorContextWithFollower();
267         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
268             @Override
269             public FiniteDuration getHeartBeatInterval() {
270                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
271             }
272         });
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         sendReplicate(actorContext, lastIndex+1);
291
292         // Wait slightly longer than heartbeat duration
293         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
294
295         leader.handleMessage(leaderActor, new SendHeartBeat());
296
297         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
298         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
299
300         assertEquals(1, allMessages.get(0).getEntries().size());
301         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
302         assertEquals(1, allMessages.get(1).getEntries().size());
303         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
304
305     }
306
307     @Test
308     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
309         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
310
311         MockRaftActorContext actorContext = createActorContextWithFollower();
312         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313             @Override
314             public FiniteDuration getHeartBeatInterval() {
315                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
316             }
317         });
318
319         long term = 1;
320         actorContext.getTermInformation().update(term, "");
321
322         leader = new Leader(actorContext);
323
324         // Leader will send an immediate heartbeat - ignore it.
325         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326
327         // The follower would normally reply - simulate that explicitly here.
328         long lastIndex = actorContext.getReplicatedLog().lastIndex();
329         leader.handleMessage(followerActor, new AppendEntriesReply(
330                 FOLLOWER_ID, term, true, lastIndex, term));
331         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332
333         followerActor.underlyingActor().clear();
334
335         for(int i=0;i<3;i++) {
336             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
337             leader.handleMessage(leaderActor, new SendHeartBeat());
338         }
339
340         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
342     }
343
344     @Test
345     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
346         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
347
348         MockRaftActorContext actorContext = createActorContextWithFollower();
349         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
350             @Override
351             public FiniteDuration getHeartBeatInterval() {
352                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
353             }
354         });
355
356         long term = 1;
357         actorContext.getTermInformation().update(term, "");
358
359         leader = new Leader(actorContext);
360
361         // Leader will send an immediate heartbeat - ignore it.
362         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
363
364         // The follower would normally reply - simulate that explicitly here.
365         long lastIndex = actorContext.getReplicatedLog().lastIndex();
366         leader.handleMessage(followerActor, new AppendEntriesReply(
367                 FOLLOWER_ID, term, true, lastIndex, term));
368         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
369
370         followerActor.underlyingActor().clear();
371
372         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
373         leader.handleMessage(leaderActor, new SendHeartBeat());
374         sendReplicate(actorContext, lastIndex+1);
375
376         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
378
379         assertEquals(0, allMessages.get(0).getEntries().size());
380         assertEquals(1, allMessages.get(1).getEntries().size());
381     }
382
383
384     @Test
385     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
386         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
387
388         MockRaftActorContext actorContext = createActorContext();
389
390         leader = new Leader(actorContext);
391
392         actorContext.setLastApplied(0);
393
394         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
395         long term = actorContext.getTermInformation().getCurrentTerm();
396         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
397                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
398
399         actorContext.getReplicatedLog().append(newEntry);
400
401         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
402                 new Replicate(leaderActor, "state-id", newEntry));
403
404         // State should not change
405         assertTrue(raftBehavior instanceof Leader);
406
407         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
408
409         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
410         // one since lastApplied state is 0.
411         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
412                 leaderActor, ApplyState.class);
413         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
414
415         for(int i = 0; i <= newLogIndex - 1; i++ ) {
416             ApplyState applyState = applyStateList.get(i);
417             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
418             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
419         }
420
421         ApplyState last = applyStateList.get((int) newLogIndex - 1);
422         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
423         assertEquals("getIdentifier", "state-id", last.getIdentifier());
424     }
425
426     @Test
427     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
428         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431
432         Map<String, String> leadersSnapshot = new HashMap<>();
433         leadersSnapshot.put("1", "A");
434         leadersSnapshot.put("2", "B");
435         leadersSnapshot.put("3", "C");
436
437         //clears leaders log
438         actorContext.getReplicatedLog().removeFrom(0);
439
440         final int followersLastIndex = 2;
441         final int snapshotIndex = 3;
442         final int newEntryIndex = 4;
443         final int snapshotTerm = 1;
444         final int currentTerm = 2;
445
446         // set the snapshot variables in replicatedlog
447         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
448         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
449         actorContext.setCommitIndex(followersLastIndex);
450         //set follower timeout to 2 mins, helps during debugging
451         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
452
453         leader = new Leader(actorContext);
454
455         // new entry
456         ReplicatedLogImplEntry entry =
457                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
458                         new MockRaftActorContext.MockPayload("D"));
459
460         //update follower timestamp
461         leader.markFollowerActive(FOLLOWER_ID);
462
463         ByteString bs = toByteString(leadersSnapshot);
464         leader.setSnapshot(Optional.of(bs));
465         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
466         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
467
468         //send first chunk and no InstallSnapshotReply received yet
469         fts.getNextChunk();
470         fts.incrementChunkIndex();
471
472         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
473                 TimeUnit.MILLISECONDS);
474
475         leader.handleMessage(leaderActor, new SendHeartBeat());
476
477         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478
479         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
480
481         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
482
483         //InstallSnapshotReply received
484         fts.markSendStatus(true);
485
486         leader.handleMessage(leaderActor, new SendHeartBeat());
487
488         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
489
490         assertEquals(snapshotIndex, is.getLastIncludedIndex());
491     }
492
493     @Test
494     public void testSendAppendEntriesSnapshotScenario() throws Exception {
495         logStart("testSendAppendEntriesSnapshotScenario");
496
497         MockRaftActorContext actorContext = createActorContextWithFollower();
498
499         Map<String, String> leadersSnapshot = new HashMap<>();
500         leadersSnapshot.put("1", "A");
501         leadersSnapshot.put("2", "B");
502         leadersSnapshot.put("3", "C");
503
504         //clears leaders log
505         actorContext.getReplicatedLog().removeFrom(0);
506
507         final int followersLastIndex = 2;
508         final int snapshotIndex = 3;
509         final int newEntryIndex = 4;
510         final int snapshotTerm = 1;
511         final int currentTerm = 2;
512
513         // set the snapshot variables in replicatedlog
514         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
515         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
516         actorContext.setCommitIndex(followersLastIndex);
517
518         leader = new Leader(actorContext);
519
520         // Leader will send an immediate heartbeat - ignore it.
521         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
522
523         // new entry
524         ReplicatedLogImplEntry entry =
525                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
526                         new MockRaftActorContext.MockPayload("D"));
527
528         actorContext.getReplicatedLog().append(entry);
529
530         //update follower timestamp
531         leader.markFollowerActive(FOLLOWER_ID);
532
533         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
534         RaftActorBehavior raftBehavior = leader.handleMessage(
535                 leaderActor, new Replicate(null, "state-id", entry));
536
537         assertTrue(raftBehavior instanceof Leader);
538
539         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
540     }
541
542     @Test
543     public void testInitiateInstallSnapshot() throws Exception {
544         logStart("testInitiateInstallSnapshot");
545
546         MockRaftActorContext actorContext = createActorContextWithFollower();
547
548         Map<String, String> leadersSnapshot = new HashMap<>();
549         leadersSnapshot.put("1", "A");
550         leadersSnapshot.put("2", "B");
551         leadersSnapshot.put("3", "C");
552
553         //clears leaders log
554         actorContext.getReplicatedLog().removeFrom(0);
555
556         final int followersLastIndex = 2;
557         final int snapshotIndex = 3;
558         final int newEntryIndex = 4;
559         final int snapshotTerm = 1;
560         final int currentTerm = 2;
561
562         // set the snapshot variables in replicatedlog
563         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
564         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
565         actorContext.setLastApplied(3);
566         actorContext.setCommitIndex(followersLastIndex);
567
568         leader = new Leader(actorContext);
569
570         // Leader will send an immediate heartbeat - ignore it.
571         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
572
573         // set the snapshot as absent and check if capture-snapshot is invoked.
574         leader.setSnapshot(Optional.<ByteString>absent());
575
576         // new entry
577         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
578                 new MockRaftActorContext.MockPayload("D"));
579
580         actorContext.getReplicatedLog().append(entry);
581
582         //update follower timestamp
583         leader.markFollowerActive(FOLLOWER_ID);
584
585         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
586
587         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
588
589         assertTrue(cs.isInstallSnapshotInitiated());
590         assertEquals(3, cs.getLastAppliedIndex());
591         assertEquals(1, cs.getLastAppliedTerm());
592         assertEquals(4, cs.getLastIndex());
593         assertEquals(2, cs.getLastTerm());
594
595         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
596         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
597
598         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
599         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
600     }
601
602     @Test
603     public void testInstallSnapshot() throws Exception {
604         logStart("testInstallSnapshot");
605
606         MockRaftActorContext actorContext = createActorContextWithFollower();
607
608         Map<String, String> leadersSnapshot = new HashMap<>();
609         leadersSnapshot.put("1", "A");
610         leadersSnapshot.put("2", "B");
611         leadersSnapshot.put("3", "C");
612
613         //clears leaders log
614         actorContext.getReplicatedLog().removeFrom(0);
615
616         final int followersLastIndex = 2;
617         final int snapshotIndex = 3;
618         final int snapshotTerm = 1;
619         final int currentTerm = 2;
620
621         // set the snapshot variables in replicatedlog
622         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
623         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
624         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
625         actorContext.setCommitIndex(followersLastIndex);
626
627         leader = new Leader(actorContext);
628
629         // Ignore initial heartbeat.
630         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
631
632         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
633                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
634
635         assertTrue(raftBehavior instanceof Leader);
636
637         // check if installsnapshot gets called with the correct values.
638
639         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
640
641         assertNotNull(installSnapshot.getData());
642         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
643         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
644
645         assertEquals(currentTerm, installSnapshot.getTerm());
646     }
647
648     @Test
649     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
650         logStart("testHandleInstallSnapshotReplyLastChunk");
651
652         MockRaftActorContext actorContext = createActorContextWithFollower();
653
654         final int followersLastIndex = 2;
655         final int snapshotIndex = 3;
656         final int snapshotTerm = 1;
657         final int currentTerm = 2;
658
659         actorContext.setCommitIndex(followersLastIndex);
660
661         leader = new Leader(actorContext);
662
663         // Ignore initial heartbeat.
664         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
665
666         Map<String, String> leadersSnapshot = new HashMap<>();
667         leadersSnapshot.put("1", "A");
668         leadersSnapshot.put("2", "B");
669         leadersSnapshot.put("3", "C");
670
671         // set the snapshot variables in replicatedlog
672
673         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
674         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
675         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
676
677         ByteString bs = toByteString(leadersSnapshot);
678         leader.setSnapshot(Optional.of(bs));
679         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
680         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
681         while(!fts.isLastChunk(fts.getChunkIndex())) {
682             fts.getNextChunk();
683             fts.incrementChunkIndex();
684         }
685
686         //clears leaders log
687         actorContext.getReplicatedLog().removeFrom(0);
688
689         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
690                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
691
692         assertTrue(raftBehavior instanceof Leader);
693
694         assertEquals(0, leader.followerSnapshotSize());
695         assertEquals(1, leader.followerLogSize());
696         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
697         assertNotNull(fli);
698         assertEquals(snapshotIndex, fli.getMatchIndex());
699         assertEquals(snapshotIndex, fli.getMatchIndex());
700         assertEquals(snapshotIndex + 1, fli.getNextIndex());
701     }
702
703     @Test
704     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
705         logStart("testSendSnapshotfromInstallSnapshotReply");
706
707         MockRaftActorContext actorContext = createActorContextWithFollower();
708
709         final int followersLastIndex = 2;
710         final int snapshotIndex = 3;
711         final int snapshotTerm = 1;
712         final int currentTerm = 2;
713
714         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
715             @Override
716             public int getSnapshotChunkSize() {
717                 return 50;
718             }
719         };
720         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
721         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
722
723         actorContext.setConfigParams(configParams);
724         actorContext.setCommitIndex(followersLastIndex);
725
726         leader = new Leader(actorContext);
727
728         Map<String, String> leadersSnapshot = new HashMap<>();
729         leadersSnapshot.put("1", "A");
730         leadersSnapshot.put("2", "B");
731         leadersSnapshot.put("3", "C");
732
733         // set the snapshot variables in replicatedlog
734         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
735         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
736         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
737
738         ByteString bs = toByteString(leadersSnapshot);
739         leader.setSnapshot(Optional.of(bs));
740
741         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
742
743         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
744
745         assertEquals(1, installSnapshot.getChunkIndex());
746         assertEquals(3, installSnapshot.getTotalChunks());
747
748         followerActor.underlyingActor().clear();
749         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
750                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
751
752         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
753
754         assertEquals(2, installSnapshot.getChunkIndex());
755         assertEquals(3, installSnapshot.getTotalChunks());
756
757         followerActor.underlyingActor().clear();
758         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
759                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
760
761         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
762
763         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
764         followerActor.underlyingActor().clear();
765         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
766                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
767
768         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
769
770         Assert.assertNull(installSnapshot);
771     }
772
773
774     @Test
775     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
776         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
777
778         MockRaftActorContext actorContext = createActorContextWithFollower();
779
780         final int followersLastIndex = 2;
781         final int snapshotIndex = 3;
782         final int snapshotTerm = 1;
783         final int currentTerm = 2;
784
785         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
786             @Override
787             public int getSnapshotChunkSize() {
788                 return 50;
789             }
790         });
791
792         actorContext.setCommitIndex(followersLastIndex);
793
794         leader = new Leader(actorContext);
795
796         Map<String, String> leadersSnapshot = new HashMap<>();
797         leadersSnapshot.put("1", "A");
798         leadersSnapshot.put("2", "B");
799         leadersSnapshot.put("3", "C");
800
801         // set the snapshot variables in replicatedlog
802         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
803         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
804         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
805
806         ByteString bs = toByteString(leadersSnapshot);
807         leader.setSnapshot(Optional.of(bs));
808
809         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
810         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
811
812         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
813
814         assertEquals(1, installSnapshot.getChunkIndex());
815         assertEquals(3, installSnapshot.getTotalChunks());
816
817         followerActor.underlyingActor().clear();
818
819         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
820                 FOLLOWER_ID, -1, false));
821
822         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
823                 TimeUnit.MILLISECONDS);
824
825         leader.handleMessage(leaderActor, new SendHeartBeat());
826
827         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
828
829         assertEquals(1, installSnapshot.getChunkIndex());
830         assertEquals(3, installSnapshot.getTotalChunks());
831     }
832
833     @Test
834     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
835         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
836
837         MockRaftActorContext actorContext = createActorContextWithFollower();
838
839         final int followersLastIndex = 2;
840         final int snapshotIndex = 3;
841         final int snapshotTerm = 1;
842         final int currentTerm = 2;
843
844         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
845             @Override
846             public int getSnapshotChunkSize() {
847                 return 50;
848             }
849         });
850
851         actorContext.setCommitIndex(followersLastIndex);
852
853         leader = new Leader(actorContext);
854
855         Map<String, String> leadersSnapshot = new HashMap<>();
856         leadersSnapshot.put("1", "A");
857         leadersSnapshot.put("2", "B");
858         leadersSnapshot.put("3", "C");
859
860         // set the snapshot variables in replicatedlog
861         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
862         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
863         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
864
865         ByteString bs = toByteString(leadersSnapshot);
866         leader.setSnapshot(Optional.of(bs));
867
868         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
869
870         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
871
872         assertEquals(1, installSnapshot.getChunkIndex());
873         assertEquals(3, installSnapshot.getTotalChunks());
874         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
875
876         int hashCode = installSnapshot.getData().hashCode();
877
878         followerActor.underlyingActor().clear();
879
880         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
881                 FOLLOWER_ID, 1, true));
882
883         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
884
885         assertEquals(2, installSnapshot.getChunkIndex());
886         assertEquals(3, installSnapshot.getTotalChunks());
887         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
888     }
889
890     @Test
891     public void testFollowerToSnapshotLogic() {
892         logStart("testFollowerToSnapshotLogic");
893
894         MockRaftActorContext actorContext = createActorContext();
895
896         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
897             @Override
898             public int getSnapshotChunkSize() {
899                 return 50;
900             }
901         });
902
903         leader = new Leader(actorContext);
904
905         Map<String, String> leadersSnapshot = new HashMap<>();
906         leadersSnapshot.put("1", "A");
907         leadersSnapshot.put("2", "B");
908         leadersSnapshot.put("3", "C");
909
910         ByteString bs = toByteString(leadersSnapshot);
911         byte[] barray = bs.toByteArray();
912
913         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
914         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
915
916         assertEquals(bs.size(), barray.length);
917
918         int chunkIndex=0;
919         for (int i=0; i < barray.length; i = i + 50) {
920             int j = i + 50;
921             chunkIndex++;
922
923             if (i + 50 > barray.length) {
924                 j = barray.length;
925             }
926
927             ByteString chunk = fts.getNextChunk();
928             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
929             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
930
931             fts.markSendStatus(true);
932             if (!fts.isLastChunk(chunkIndex)) {
933                 fts.incrementChunkIndex();
934             }
935         }
936
937         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
938     }
939
940     @Override protected RaftActorBehavior createBehavior(
941         RaftActorContext actorContext) {
942         return new Leader(actorContext);
943     }
944
945     @Override
946     protected MockRaftActorContext createActorContext() {
947         return createActorContext(leaderActor);
948     }
949
950     @Override
951     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
952         return createActorContext(LEADER_ID, actorRef);
953     }
954
955     private MockRaftActorContext createActorContextWithFollower() {
956         MockRaftActorContext actorContext = createActorContext();
957         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
958                 followerActor.path().toString()).build());
959         return actorContext;
960     }
961
962     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
963         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
964         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
965         configParams.setElectionTimeoutFactor(100000);
966         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
967         context.setConfigParams(configParams);
968         return context;
969     }
970
971     @Test
972     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
973         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
974
975         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
976
977         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
978
979         Follower follower = new Follower(followerActorContext);
980         followerActor.underlyingActor().setBehavior(follower);
981
982         Map<String, String> peerAddresses = new HashMap<>();
983         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
984
985         leaderActorContext.setPeerAddresses(peerAddresses);
986
987         leaderActorContext.getReplicatedLog().removeFrom(0);
988
989         //create 3 entries
990         leaderActorContext.setReplicatedLog(
991                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
992
993         leaderActorContext.setCommitIndex(1);
994
995         followerActorContext.getReplicatedLog().removeFrom(0);
996
997         // follower too has the exact same log entries and has the same commit index
998         followerActorContext.setReplicatedLog(
999                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1000
1001         followerActorContext.setCommitIndex(1);
1002
1003         leader = new Leader(leaderActorContext);
1004
1005         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1006
1007         assertEquals(1, appendEntries.getLeaderCommit());
1008         assertEquals(0, appendEntries.getEntries().size());
1009         assertEquals(0, appendEntries.getPrevLogIndex());
1010
1011         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1012                 leaderActor, AppendEntriesReply.class);
1013
1014         assertEquals(2, appendEntriesReply.getLogLastIndex());
1015         assertEquals(1, appendEntriesReply.getLogLastTerm());
1016
1017         // follower returns its next index
1018         assertEquals(2, appendEntriesReply.getLogLastIndex());
1019         assertEquals(1, appendEntriesReply.getLogLastTerm());
1020
1021         follower.close();
1022     }
1023
1024     @Test
1025     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1026         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1027
1028         MockRaftActorContext leaderActorContext = createActorContext();
1029
1030         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1031         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1032
1033         Follower follower = new Follower(followerActorContext);
1034         followerActor.underlyingActor().setBehavior(follower);
1035
1036         Map<String, String> leaderPeerAddresses = new HashMap<>();
1037         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1038
1039         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1040
1041         leaderActorContext.getReplicatedLog().removeFrom(0);
1042
1043         leaderActorContext.setReplicatedLog(
1044                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1045
1046         leaderActorContext.setCommitIndex(1);
1047
1048         followerActorContext.getReplicatedLog().removeFrom(0);
1049
1050         followerActorContext.setReplicatedLog(
1051                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1052
1053         // follower has the same log entries but its commit index > leaders commit index
1054         followerActorContext.setCommitIndex(2);
1055
1056         leader = new Leader(leaderActorContext);
1057
1058         // Initial heartbeat
1059         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1060
1061         assertEquals(1, appendEntries.getLeaderCommit());
1062         assertEquals(0, appendEntries.getEntries().size());
1063         assertEquals(0, appendEntries.getPrevLogIndex());
1064
1065         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1066                 leaderActor, AppendEntriesReply.class);
1067
1068         assertEquals(2, appendEntriesReply.getLogLastIndex());
1069         assertEquals(1, appendEntriesReply.getLogLastTerm());
1070
1071         leaderActor.underlyingActor().setBehavior(follower);
1072         leader.handleMessage(followerActor, appendEntriesReply);
1073
1074         leaderActor.underlyingActor().clear();
1075         followerActor.underlyingActor().clear();
1076
1077         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1078                 TimeUnit.MILLISECONDS);
1079
1080         leader.handleMessage(leaderActor, new SendHeartBeat());
1081
1082         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1083
1084         assertEquals(2, appendEntries.getLeaderCommit());
1085         assertEquals(0, appendEntries.getEntries().size());
1086         assertEquals(2, appendEntries.getPrevLogIndex());
1087
1088         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1089
1090         assertEquals(2, appendEntriesReply.getLogLastIndex());
1091         assertEquals(1, appendEntriesReply.getLogLastTerm());
1092
1093         assertEquals(2, followerActorContext.getCommitIndex());
1094
1095         follower.close();
1096     }
1097
1098     @Test
1099     public void testHandleAppendEntriesReplyFailure(){
1100         logStart("testHandleAppendEntriesReplyFailure");
1101
1102         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1103
1104         leader = new Leader(leaderActorContext);
1105
1106         // Send initial heartbeat reply with last index.
1107         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1108
1109         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1110         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1111
1112         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1113
1114         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1115
1116         assertEquals(RaftState.Leader, raftActorBehavior.state());
1117
1118         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1119     }
1120
1121     @Test
1122     public void testHandleAppendEntriesReplySuccess() throws Exception {
1123         logStart("testHandleAppendEntriesReplySuccess");
1124
1125         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1126
1127         leaderActorContext.setReplicatedLog(
1128                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1129
1130         leaderActorContext.setCommitIndex(1);
1131         leaderActorContext.setLastApplied(1);
1132         leaderActorContext.getTermInformation().update(1, "leader");
1133
1134         leader = new Leader(leaderActorContext);
1135
1136         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1137
1138         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1139
1140         assertEquals(RaftState.Leader, raftActorBehavior.state());
1141
1142         assertEquals(2, leaderActorContext.getCommitIndex());
1143
1144         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1145                 leaderActor, ApplyJournalEntries.class);
1146
1147         assertEquals(2, leaderActorContext.getLastApplied());
1148
1149         assertEquals(2, applyJournalEntries.getToIndex());
1150
1151         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1152                 ApplyState.class);
1153
1154         assertEquals(1,applyStateList.size());
1155
1156         ApplyState applyState = applyStateList.get(0);
1157
1158         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1159     }
1160
1161     @Test
1162     public void testHandleAppendEntriesReplyUnknownFollower(){
1163         logStart("testHandleAppendEntriesReplyUnknownFollower");
1164
1165         MockRaftActorContext leaderActorContext = createActorContext();
1166
1167         leader = new Leader(leaderActorContext);
1168
1169         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1170
1171         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1172
1173         assertEquals(RaftState.Leader, raftActorBehavior.state());
1174     }
1175
1176     @Test
1177     public void testHandleRequestVoteReply(){
1178         logStart("testHandleRequestVoteReply");
1179
1180         MockRaftActorContext leaderActorContext = createActorContext();
1181
1182         leader = new Leader(leaderActorContext);
1183
1184         // Should be a no-op.
1185         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1186                 new RequestVoteReply(1, true));
1187
1188         assertEquals(RaftState.Leader, raftActorBehavior.state());
1189
1190         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1191
1192         assertEquals(RaftState.Leader, raftActorBehavior.state());
1193     }
1194
1195     @Test
1196     public void testIsolatedLeaderCheckNoFollowers() {
1197         logStart("testIsolatedLeaderCheckNoFollowers");
1198
1199         MockRaftActorContext leaderActorContext = createActorContext();
1200
1201         leader = new Leader(leaderActorContext);
1202         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1203         Assert.assertTrue(behavior instanceof Leader);
1204     }
1205
1206     @Test
1207     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1208         logStart("testIsolatedLeaderCheckTwoFollowers");
1209
1210         new JavaTestKit(getSystem()) {{
1211
1212             ActorRef followerActor1 = getTestActor();
1213             ActorRef followerActor2 = getTestActor();
1214
1215             MockRaftActorContext leaderActorContext = createActorContext();
1216
1217             Map<String, String> peerAddresses = new HashMap<>();
1218             peerAddresses.put("follower-1", followerActor1.path().toString());
1219             peerAddresses.put("follower-2", followerActor2.path().toString());
1220
1221             leaderActorContext.setPeerAddresses(peerAddresses);
1222
1223             leader = new Leader(leaderActorContext);
1224
1225             leader.markFollowerActive("follower-1");
1226             leader.markFollowerActive("follower-2");
1227             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1228             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1229                 behavior instanceof Leader);
1230
1231             // kill 1 follower and verify if that got killed
1232             final JavaTestKit probe = new JavaTestKit(getSystem());
1233             probe.watch(followerActor1);
1234             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1235             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1236             assertEquals(termMsg1.getActor(), followerActor1);
1237
1238             leader.markFollowerInActive("follower-1");
1239             leader.markFollowerActive("follower-2");
1240             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1241             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1242                 behavior instanceof Leader);
1243
1244             // kill 2nd follower and leader should change to Isolated leader
1245             followerActor2.tell(PoisonPill.getInstance(), null);
1246             probe.watch(followerActor2);
1247             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1248             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1249             assertEquals(termMsg2.getActor(), followerActor2);
1250
1251             leader.markFollowerInActive("follower-2");
1252             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1253             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1254                 behavior instanceof IsolatedLeader);
1255         }};
1256     }
1257
1258
1259     @Test
1260     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1261         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1262
1263         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1264
1265         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1266         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1267         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1268
1269         leaderActorContext.setConfigParams(configParams);
1270
1271         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1272
1273         followerActorContext.setConfigParams(configParams);
1274         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1275
1276         Follower follower = new Follower(followerActorContext);
1277         followerActor.underlyingActor().setBehavior(follower);
1278
1279         leaderActorContext.getReplicatedLog().removeFrom(0);
1280         leaderActorContext.setCommitIndex(-1);
1281         leaderActorContext.setLastApplied(-1);
1282
1283         followerActorContext.getReplicatedLog().removeFrom(0);
1284         followerActorContext.setCommitIndex(-1);
1285         followerActorContext.setLastApplied(-1);
1286
1287         leader = new Leader(leaderActorContext);
1288
1289         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1290                 leaderActor, AppendEntriesReply.class);
1291
1292         leader.handleMessage(followerActor, appendEntriesReply);
1293
1294         // Clear initial heartbeat messages
1295
1296         leaderActor.underlyingActor().clear();
1297         followerActor.underlyingActor().clear();
1298
1299         // create 3 entries
1300         leaderActorContext.setReplicatedLog(
1301                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1302         leaderActorContext.setCommitIndex(1);
1303         leaderActorContext.setLastApplied(1);
1304
1305         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1306                 TimeUnit.MILLISECONDS);
1307
1308         leader.handleMessage(leaderActor, new SendHeartBeat());
1309
1310         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1311
1312         // Should send first log entry
1313         assertEquals(1, appendEntries.getLeaderCommit());
1314         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1315         assertEquals(-1, appendEntries.getPrevLogIndex());
1316
1317         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1318
1319         assertEquals(1, appendEntriesReply.getLogLastTerm());
1320         assertEquals(0, appendEntriesReply.getLogLastIndex());
1321
1322         followerActor.underlyingActor().clear();
1323
1324         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1325
1326         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1327
1328         // Should send second log entry
1329         assertEquals(1, appendEntries.getLeaderCommit());
1330         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1331
1332         follower.close();
1333     }
1334
1335     @Test
1336     public void testLaggingFollowerStarvation() throws Exception {
1337         logStart("testLaggingFollowerStarvation");
1338         new JavaTestKit(getSystem()) {{
1339             String leaderActorId = actorFactory.generateActorId("leader");
1340             String follower1ActorId = actorFactory.generateActorId("follower");
1341             String follower2ActorId = actorFactory.generateActorId("follower");
1342
1343             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1344                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1345             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1346             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1347
1348             MockRaftActorContext leaderActorContext =
1349                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1350
1351             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1352             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1353             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1354
1355             leaderActorContext.setConfigParams(configParams);
1356
1357             leaderActorContext.setReplicatedLog(
1358                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1359
1360             Map<String, String> peerAddresses = new HashMap<>();
1361             peerAddresses.put(follower1ActorId,
1362                     follower1Actor.path().toString());
1363             peerAddresses.put(follower2ActorId,
1364                     follower2Actor.path().toString());
1365
1366             leaderActorContext.setPeerAddresses(peerAddresses);
1367             leaderActorContext.getTermInformation().update(1, leaderActorId);
1368
1369             RaftActorBehavior leader = createBehavior(leaderActorContext);
1370
1371             leaderActor.underlyingActor().setBehavior(leader);
1372
1373             for(int i=1;i<6;i++) {
1374                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1375                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1376                 assertTrue(newBehavior == leader);
1377                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1378             }
1379
1380             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1381             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1382
1383             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1384                     heartbeats.size() > 1);
1385
1386             // Check if follower-2 got AppendEntries during this time and was not starved
1387             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1388
1389             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1390                     appendEntries.size() > 1);
1391
1392         }};
1393     }
1394
1395     @Override
1396     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1397             ActorRef actorRef, RaftRPC rpc) throws Exception {
1398         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1399         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1400     }
1401
1402     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1403
1404         private final long electionTimeOutIntervalMillis;
1405         private final int snapshotChunkSize;
1406
1407         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1408             super();
1409             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1410             this.snapshotChunkSize = snapshotChunkSize;
1411         }
1412
1413         @Override
1414         public FiniteDuration getElectionTimeOutInterval() {
1415             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1416         }
1417
1418         @Override
1419         public int getSnapshotChunkSize() {
1420             return snapshotChunkSize;
1421         }
1422     }
1423 }