BUG 2849 : Reduce sending of duplicate replication messages
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractLeaderTest {
50
51     static final String FOLLOWER_ID = "follower";
52
53     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55
56     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58
59     private Leader leader;
60
61     @Override
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         super.tearDown();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123
124     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
125         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
126         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
127                 1, index, payload);
128         actorContext.getReplicatedLog().append(newEntry);
129         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
130     }
131
132     @Test
133     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
134         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
135
136         MockRaftActorContext actorContext = createActorContextWithFollower();
137
138         long term = 1;
139         actorContext.getTermInformation().update(term, "");
140
141         leader = new Leader(actorContext);
142
143         // Leader will send an immediate heartbeat - ignore it.
144         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145
146         // The follower would normally reply - simulate that explicitly here.
147         long lastIndex = actorContext.getReplicatedLog().lastIndex();
148         leader.handleMessage(followerActor, new AppendEntriesReply(
149                 FOLLOWER_ID, term, true, lastIndex, term));
150         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
151
152         followerActor.underlyingActor().clear();
153
154         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156                 1, lastIndex + 1, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
159
160         // State should not change
161         assertTrue(raftBehavior instanceof Leader);
162
163         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166         assertEquals("Entries size", 1, appendEntries.getEntries().size());
167         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
170     }
171
172     @Test
173     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
174         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
175
176         MockRaftActorContext actorContext = createActorContextWithFollower();
177         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
178             @Override
179             public FiniteDuration getHeartBeatInterval() {
180                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
181             }
182         });
183
184         long term = 1;
185         actorContext.getTermInformation().update(term, "");
186
187         leader = new Leader(actorContext);
188
189         // Leader will send an immediate heartbeat - ignore it.
190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191
192         // The follower would normally reply - simulate that explicitly here.
193         long lastIndex = actorContext.getReplicatedLog().lastIndex();
194         leader.handleMessage(followerActor, new AppendEntriesReply(
195                 FOLLOWER_ID, term, true, lastIndex, term));
196         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197
198         followerActor.underlyingActor().clear();
199
200         for(int i=0;i<5;i++) {
201             sendReplicate(actorContext, lastIndex+i+1);
202         }
203
204         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
205         // We expect only 1 message to be sent because of two reasons,
206         // - an append entries reply was not received
207         // - the heartbeat interval has not expired
208         // In this scenario if multiple messages are sent they would likely be duplicates
209         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
210     }
211
212     @Test
213     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
214         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
215
216         MockRaftActorContext actorContext = createActorContextWithFollower();
217         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
218             @Override
219             public FiniteDuration getHeartBeatInterval() {
220                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
221             }
222         });
223
224         long term = 1;
225         actorContext.getTermInformation().update(term, "");
226
227         leader = new Leader(actorContext);
228
229         // Leader will send an immediate heartbeat - ignore it.
230         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
231
232         // The follower would normally reply - simulate that explicitly here.
233         long lastIndex = actorContext.getReplicatedLog().lastIndex();
234         leader.handleMessage(followerActor, new AppendEntriesReply(
235                 FOLLOWER_ID, term, true, lastIndex, term));
236         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
237
238         followerActor.underlyingActor().clear();
239
240         for(int i=0;i<3;i++) {
241             sendReplicate(actorContext, lastIndex+i+1);
242             leader.handleMessage(followerActor, new AppendEntriesReply(
243                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
244
245         }
246
247         for(int i=3;i<5;i++) {
248             sendReplicate(actorContext, lastIndex + i + 1);
249         }
250
251         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
252         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
253         // get sent to the follower - but not the 5th
254         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
255
256         for(int i=0;i<4;i++) {
257             long expected = allMessages.get(i).getEntries().get(0).getIndex();
258             assertEquals(expected, i+2);
259         }
260     }
261
262     @Test
263     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
264         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
265
266         MockRaftActorContext actorContext = createActorContextWithFollower();
267         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
268             @Override
269             public FiniteDuration getHeartBeatInterval() {
270                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
271             }
272         });
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         sendReplicate(actorContext, lastIndex+1);
291
292         // Wait slightly longer than heartbeat duration
293         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
294
295         leader.handleMessage(leaderActor, new SendHeartBeat());
296
297         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
298         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
299
300         assertEquals(1, allMessages.get(0).getEntries().size());
301         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
302         assertEquals(1, allMessages.get(1).getEntries().size());
303         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
304
305     }
306
307     @Test
308     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
309         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
310
311         MockRaftActorContext actorContext = createActorContextWithFollower();
312         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313             @Override
314             public FiniteDuration getHeartBeatInterval() {
315                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
316             }
317         });
318
319         long term = 1;
320         actorContext.getTermInformation().update(term, "");
321
322         leader = new Leader(actorContext);
323
324         // Leader will send an immediate heartbeat - ignore it.
325         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326
327         // The follower would normally reply - simulate that explicitly here.
328         long lastIndex = actorContext.getReplicatedLog().lastIndex();
329         leader.handleMessage(followerActor, new AppendEntriesReply(
330                 FOLLOWER_ID, term, true, lastIndex, term));
331         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332
333         followerActor.underlyingActor().clear();
334
335         for(int i=0;i<3;i++) {
336             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
337             leader.handleMessage(leaderActor, new SendHeartBeat());
338         }
339
340         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
342     }
343
344     @Test
345     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
346         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
347
348         MockRaftActorContext actorContext = createActorContextWithFollower();
349         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
350             @Override
351             public FiniteDuration getHeartBeatInterval() {
352                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
353             }
354         });
355
356         long term = 1;
357         actorContext.getTermInformation().update(term, "");
358
359         leader = new Leader(actorContext);
360
361         // Leader will send an immediate heartbeat - ignore it.
362         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
363
364         // The follower would normally reply - simulate that explicitly here.
365         long lastIndex = actorContext.getReplicatedLog().lastIndex();
366         leader.handleMessage(followerActor, new AppendEntriesReply(
367                 FOLLOWER_ID, term, true, lastIndex, term));
368         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
369
370         followerActor.underlyingActor().clear();
371
372         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
373         leader.handleMessage(leaderActor, new SendHeartBeat());
374         sendReplicate(actorContext, lastIndex+1);
375
376         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
378
379         assertEquals(0, allMessages.get(0).getEntries().size());
380         assertEquals(1, allMessages.get(1).getEntries().size());
381     }
382
383
384     @Test
385     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
386         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
387
388         MockRaftActorContext actorContext = createActorContext();
389
390         leader = new Leader(actorContext);
391
392         actorContext.setLastApplied(0);
393
394         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
395         long term = actorContext.getTermInformation().getCurrentTerm();
396         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
397                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
398
399         actorContext.getReplicatedLog().append(newEntry);
400
401         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
402                 new Replicate(leaderActor, "state-id", newEntry));
403
404         // State should not change
405         assertTrue(raftBehavior instanceof Leader);
406
407         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
408
409         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
410         // one since lastApplied state is 0.
411         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
412                 leaderActor, ApplyState.class);
413         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
414
415         for(int i = 0; i <= newLogIndex - 1; i++ ) {
416             ApplyState applyState = applyStateList.get(i);
417             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
418             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
419         }
420
421         ApplyState last = applyStateList.get((int) newLogIndex - 1);
422         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
423         assertEquals("getIdentifier", "state-id", last.getIdentifier());
424     }
425
426     @Test
427     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
428         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431
432         Map<String, String> leadersSnapshot = new HashMap<>();
433         leadersSnapshot.put("1", "A");
434         leadersSnapshot.put("2", "B");
435         leadersSnapshot.put("3", "C");
436
437         //clears leaders log
438         actorContext.getReplicatedLog().removeFrom(0);
439
440         final int followersLastIndex = 2;
441         final int snapshotIndex = 3;
442         final int newEntryIndex = 4;
443         final int snapshotTerm = 1;
444         final int currentTerm = 2;
445
446         // set the snapshot variables in replicatedlog
447         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
448         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
449         actorContext.setCommitIndex(followersLastIndex);
450         //set follower timeout to 2 mins, helps during debugging
451         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
452
453         leader = new Leader(actorContext);
454
455         // new entry
456         ReplicatedLogImplEntry entry =
457                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
458                         new MockRaftActorContext.MockPayload("D"));
459
460         //update follower timestamp
461         leader.markFollowerActive(FOLLOWER_ID);
462
463         ByteString bs = toByteString(leadersSnapshot);
464         leader.setSnapshot(Optional.of(bs));
465         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
466         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
467
468         //send first chunk and no InstallSnapshotReply received yet
469         fts.getNextChunk();
470         fts.incrementChunkIndex();
471
472         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
473                 TimeUnit.MILLISECONDS);
474
475         leader.handleMessage(leaderActor, new SendHeartBeat());
476
477         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478
479         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
480
481         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
482
483         //InstallSnapshotReply received
484         fts.markSendStatus(true);
485
486         leader.handleMessage(leaderActor, new SendHeartBeat());
487
488         InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
489                 InstallSnapshot.SERIALIZABLE_CLASS);
490
491         InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
492
493         assertEquals(snapshotIndex, is.getLastIncludedIndex());
494     }
495
496     @Test
497     public void testSendAppendEntriesSnapshotScenario() throws Exception {
498         logStart("testSendAppendEntriesSnapshotScenario");
499
500         MockRaftActorContext actorContext = createActorContextWithFollower();
501
502         Map<String, String> leadersSnapshot = new HashMap<>();
503         leadersSnapshot.put("1", "A");
504         leadersSnapshot.put("2", "B");
505         leadersSnapshot.put("3", "C");
506
507         //clears leaders log
508         actorContext.getReplicatedLog().removeFrom(0);
509
510         final int followersLastIndex = 2;
511         final int snapshotIndex = 3;
512         final int newEntryIndex = 4;
513         final int snapshotTerm = 1;
514         final int currentTerm = 2;
515
516         // set the snapshot variables in replicatedlog
517         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
518         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
519         actorContext.setCommitIndex(followersLastIndex);
520
521         leader = new Leader(actorContext);
522
523         // Leader will send an immediate heartbeat - ignore it.
524         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
525
526         // new entry
527         ReplicatedLogImplEntry entry =
528                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
529                         new MockRaftActorContext.MockPayload("D"));
530
531         //update follower timestamp
532         leader.markFollowerActive(FOLLOWER_ID);
533
534         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
535         RaftActorBehavior raftBehavior = leader.handleMessage(
536                 leaderActor, new Replicate(null, "state-id", entry));
537
538         assertTrue(raftBehavior instanceof Leader);
539
540         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
541     }
542
543     @Test
544     public void testInitiateInstallSnapshot() throws Exception {
545         logStart("testInitiateInstallSnapshot");
546
547         MockRaftActorContext actorContext = createActorContextWithFollower();
548
549         Map<String, String> leadersSnapshot = new HashMap<>();
550         leadersSnapshot.put("1", "A");
551         leadersSnapshot.put("2", "B");
552         leadersSnapshot.put("3", "C");
553
554         //clears leaders log
555         actorContext.getReplicatedLog().removeFrom(0);
556
557         final int followersLastIndex = 2;
558         final int snapshotIndex = 3;
559         final int newEntryIndex = 4;
560         final int snapshotTerm = 1;
561         final int currentTerm = 2;
562
563         // set the snapshot variables in replicatedlog
564         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
565         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
566         actorContext.setLastApplied(3);
567         actorContext.setCommitIndex(followersLastIndex);
568
569         leader = new Leader(actorContext);
570
571         // Leader will send an immediate heartbeat - ignore it.
572         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
573
574         // set the snapshot as absent and check if capture-snapshot is invoked.
575         leader.setSnapshot(Optional.<ByteString>absent());
576
577         // new entry
578         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
579                 new MockRaftActorContext.MockPayload("D"));
580
581         actorContext.getReplicatedLog().append(entry);
582
583         //update follower timestamp
584         leader.markFollowerActive(FOLLOWER_ID);
585
586         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
587
588         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
589
590         assertTrue(cs.isInstallSnapshotInitiated());
591         assertEquals(3, cs.getLastAppliedIndex());
592         assertEquals(1, cs.getLastAppliedTerm());
593         assertEquals(4, cs.getLastIndex());
594         assertEquals(2, cs.getLastTerm());
595
596         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
597         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
598
599         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
600         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
601     }
602
603     @Test
604     public void testInstallSnapshot() throws Exception {
605         logStart("testInstallSnapshot");
606
607         MockRaftActorContext actorContext = createActorContextWithFollower();
608
609         Map<String, String> leadersSnapshot = new HashMap<>();
610         leadersSnapshot.put("1", "A");
611         leadersSnapshot.put("2", "B");
612         leadersSnapshot.put("3", "C");
613
614         //clears leaders log
615         actorContext.getReplicatedLog().removeFrom(0);
616
617         final int followersLastIndex = 2;
618         final int snapshotIndex = 3;
619         final int snapshotTerm = 1;
620         final int currentTerm = 2;
621
622         // set the snapshot variables in replicatedlog
623         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
624         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
625         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
626         actorContext.setCommitIndex(followersLastIndex);
627
628         leader = new Leader(actorContext);
629
630         // Ignore initial heartbeat.
631         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
632
633         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
634                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
635
636         assertTrue(raftBehavior instanceof Leader);
637
638         // check if installsnapshot gets called with the correct values.
639
640         InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
641                 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
642
643         assertNotNull(installSnapshot.getData());
644         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
645         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
646
647         assertEquals(currentTerm, installSnapshot.getTerm());
648     }
649
650     @Test
651     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
652         logStart("testHandleInstallSnapshotReplyLastChunk");
653
654         MockRaftActorContext actorContext = createActorContextWithFollower();
655
656         final int followersLastIndex = 2;
657         final int snapshotIndex = 3;
658         final int snapshotTerm = 1;
659         final int currentTerm = 2;
660
661         actorContext.setCommitIndex(followersLastIndex);
662
663         leader = new Leader(actorContext);
664
665         // Ignore initial heartbeat.
666         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
667
668         Map<String, String> leadersSnapshot = new HashMap<>();
669         leadersSnapshot.put("1", "A");
670         leadersSnapshot.put("2", "B");
671         leadersSnapshot.put("3", "C");
672
673         // set the snapshot variables in replicatedlog
674
675         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
676         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
677         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
678
679         ByteString bs = toByteString(leadersSnapshot);
680         leader.setSnapshot(Optional.of(bs));
681         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
682         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
683         while(!fts.isLastChunk(fts.getChunkIndex())) {
684             fts.getNextChunk();
685             fts.incrementChunkIndex();
686         }
687
688         //clears leaders log
689         actorContext.getReplicatedLog().removeFrom(0);
690
691         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
692                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
693
694         assertTrue(raftBehavior instanceof Leader);
695
696         assertEquals(0, leader.followerSnapshotSize());
697         assertEquals(1, leader.followerLogSize());
698         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
699         assertNotNull(fli);
700         assertEquals(snapshotIndex, fli.getMatchIndex());
701         assertEquals(snapshotIndex, fli.getMatchIndex());
702         assertEquals(snapshotIndex + 1, fli.getNextIndex());
703     }
704
705     @Test
706     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
707         logStart("testSendSnapshotfromInstallSnapshotReply");
708
709         MockRaftActorContext actorContext = createActorContextWithFollower();
710
711         final int followersLastIndex = 2;
712         final int snapshotIndex = 3;
713         final int snapshotTerm = 1;
714         final int currentTerm = 2;
715
716         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
717             @Override
718             public int getSnapshotChunkSize() {
719                 return 50;
720             }
721         };
722         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
723         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
724
725         actorContext.setConfigParams(configParams);
726         actorContext.setCommitIndex(followersLastIndex);
727
728         leader = new Leader(actorContext);
729
730         Map<String, String> leadersSnapshot = new HashMap<>();
731         leadersSnapshot.put("1", "A");
732         leadersSnapshot.put("2", "B");
733         leadersSnapshot.put("3", "C");
734
735         // set the snapshot variables in replicatedlog
736         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
737         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
738         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
739
740         ByteString bs = toByteString(leadersSnapshot);
741         leader.setSnapshot(Optional.of(bs));
742
743         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
744
745         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
746                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
747
748         assertEquals(1, installSnapshot.getChunkIndex());
749         assertEquals(3, installSnapshot.getTotalChunks());
750
751         followerActor.underlyingActor().clear();
752         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
753                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
754
755         installSnapshot = MessageCollectorActor.expectFirstMatching(
756                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
757
758         assertEquals(2, installSnapshot.getChunkIndex());
759         assertEquals(3, installSnapshot.getTotalChunks());
760
761         followerActor.underlyingActor().clear();
762         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
763                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
764
765         installSnapshot = MessageCollectorActor.expectFirstMatching(
766                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
767
768         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
769         followerActor.underlyingActor().clear();
770         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
771                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
772
773         installSnapshot = MessageCollectorActor.getFirstMatching(
774                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
775
776         Assert.assertNull(installSnapshot);
777     }
778
779
780     @Test
781     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
782         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
783
784         MockRaftActorContext actorContext = createActorContextWithFollower();
785
786         final int followersLastIndex = 2;
787         final int snapshotIndex = 3;
788         final int snapshotTerm = 1;
789         final int currentTerm = 2;
790
791         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
792             @Override
793             public int getSnapshotChunkSize() {
794                 return 50;
795             }
796         });
797
798         actorContext.setCommitIndex(followersLastIndex);
799
800         leader = new Leader(actorContext);
801
802         Map<String, String> leadersSnapshot = new HashMap<>();
803         leadersSnapshot.put("1", "A");
804         leadersSnapshot.put("2", "B");
805         leadersSnapshot.put("3", "C");
806
807         // set the snapshot variables in replicatedlog
808         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
809         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
810         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
811
812         ByteString bs = toByteString(leadersSnapshot);
813         leader.setSnapshot(Optional.of(bs));
814
815         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
816
817         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
818                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
819
820         assertEquals(1, installSnapshot.getChunkIndex());
821         assertEquals(3, installSnapshot.getTotalChunks());
822
823         followerActor.underlyingActor().clear();
824
825         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
826                 FOLLOWER_ID, -1, false));
827
828         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
829                 TimeUnit.MILLISECONDS);
830
831         leader.handleMessage(leaderActor, new SendHeartBeat());
832
833         installSnapshot = MessageCollectorActor.expectFirstMatching(
834                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
835
836         assertEquals(1, installSnapshot.getChunkIndex());
837         assertEquals(3, installSnapshot.getTotalChunks());
838     }
839
840     @Test
841     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
842         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
843
844         MockRaftActorContext actorContext = createActorContextWithFollower();
845
846         final int followersLastIndex = 2;
847         final int snapshotIndex = 3;
848         final int snapshotTerm = 1;
849         final int currentTerm = 2;
850
851         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
852             @Override
853             public int getSnapshotChunkSize() {
854                 return 50;
855             }
856         });
857
858         actorContext.setCommitIndex(followersLastIndex);
859
860         leader = new Leader(actorContext);
861
862         Map<String, String> leadersSnapshot = new HashMap<>();
863         leadersSnapshot.put("1", "A");
864         leadersSnapshot.put("2", "B");
865         leadersSnapshot.put("3", "C");
866
867         // set the snapshot variables in replicatedlog
868         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
869         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
870         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
871
872         ByteString bs = toByteString(leadersSnapshot);
873         leader.setSnapshot(Optional.of(bs));
874
875         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
876
877         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
878                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
879
880         assertEquals(1, installSnapshot.getChunkIndex());
881         assertEquals(3, installSnapshot.getTotalChunks());
882         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
883
884         int hashCode = installSnapshot.getData().hashCode();
885
886         followerActor.underlyingActor().clear();
887
888         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
889                 FOLLOWER_ID, 1, true));
890
891         installSnapshot = MessageCollectorActor.expectFirstMatching(
892                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
893
894         assertEquals(2, installSnapshot.getChunkIndex());
895         assertEquals(3, installSnapshot.getTotalChunks());
896         assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
897     }
898
899     @Test
900     public void testFollowerToSnapshotLogic() {
901         logStart("testFollowerToSnapshotLogic");
902
903         MockRaftActorContext actorContext = createActorContext();
904
905         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
906             @Override
907             public int getSnapshotChunkSize() {
908                 return 50;
909             }
910         });
911
912         leader = new Leader(actorContext);
913
914         Map<String, String> leadersSnapshot = new HashMap<>();
915         leadersSnapshot.put("1", "A");
916         leadersSnapshot.put("2", "B");
917         leadersSnapshot.put("3", "C");
918
919         ByteString bs = toByteString(leadersSnapshot);
920         byte[] barray = bs.toByteArray();
921
922         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
923         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
924
925         assertEquals(bs.size(), barray.length);
926
927         int chunkIndex=0;
928         for (int i=0; i < barray.length; i = i + 50) {
929             int j = i + 50;
930             chunkIndex++;
931
932             if (i + 50 > barray.length) {
933                 j = barray.length;
934             }
935
936             ByteString chunk = fts.getNextChunk();
937             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
938             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
939
940             fts.markSendStatus(true);
941             if (!fts.isLastChunk(chunkIndex)) {
942                 fts.incrementChunkIndex();
943             }
944         }
945
946         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
947     }
948
949     @Override protected RaftActorBehavior createBehavior(
950         RaftActorContext actorContext) {
951         return new Leader(actorContext);
952     }
953
954     @Override
955     protected MockRaftActorContext createActorContext() {
956         return createActorContext(leaderActor);
957     }
958
959     @Override
960     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
961         return createActorContext("leader", actorRef);
962     }
963
964     private MockRaftActorContext createActorContextWithFollower() {
965         MockRaftActorContext actorContext = createActorContext();
966         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
967                 followerActor.path().toString()).build());
968         return actorContext;
969     }
970
971     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
972         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
973         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
974         configParams.setElectionTimeoutFactor(100000);
975         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
976         context.setConfigParams(configParams);
977         return context;
978     }
979
980     @Test
981     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
982         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
983
984         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
985
986         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
987
988         Follower follower = new Follower(followerActorContext);
989         followerActor.underlyingActor().setBehavior(follower);
990
991         Map<String, String> peerAddresses = new HashMap<>();
992         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
993
994         leaderActorContext.setPeerAddresses(peerAddresses);
995
996         leaderActorContext.getReplicatedLog().removeFrom(0);
997
998         //create 3 entries
999         leaderActorContext.setReplicatedLog(
1000                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1001
1002         leaderActorContext.setCommitIndex(1);
1003
1004         followerActorContext.getReplicatedLog().removeFrom(0);
1005
1006         // follower too has the exact same log entries and has the same commit index
1007         followerActorContext.setReplicatedLog(
1008                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1009
1010         followerActorContext.setCommitIndex(1);
1011
1012         leader = new Leader(leaderActorContext);
1013
1014         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1015
1016         assertEquals(1, appendEntries.getLeaderCommit());
1017         assertEquals(0, appendEntries.getEntries().size());
1018         assertEquals(0, appendEntries.getPrevLogIndex());
1019
1020         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1021                 leaderActor, AppendEntriesReply.class);
1022
1023         assertEquals(2, appendEntriesReply.getLogLastIndex());
1024         assertEquals(1, appendEntriesReply.getLogLastTerm());
1025
1026         // follower returns its next index
1027         assertEquals(2, appendEntriesReply.getLogLastIndex());
1028         assertEquals(1, appendEntriesReply.getLogLastTerm());
1029
1030         follower.close();
1031     }
1032
1033     @Test
1034     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1035         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1036
1037         MockRaftActorContext leaderActorContext = createActorContext();
1038
1039         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1040
1041         Follower follower = new Follower(followerActorContext);
1042         followerActor.underlyingActor().setBehavior(follower);
1043
1044         Map<String, String> peerAddresses = new HashMap<>();
1045         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1046
1047         leaderActorContext.setPeerAddresses(peerAddresses);
1048
1049         leaderActorContext.getReplicatedLog().removeFrom(0);
1050
1051         leaderActorContext.setReplicatedLog(
1052                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1053
1054         leaderActorContext.setCommitIndex(1);
1055
1056         followerActorContext.getReplicatedLog().removeFrom(0);
1057
1058         followerActorContext.setReplicatedLog(
1059                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1060
1061         // follower has the same log entries but its commit index > leaders commit index
1062         followerActorContext.setCommitIndex(2);
1063
1064         leader = new Leader(leaderActorContext);
1065
1066         // Initial heartbeat
1067         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1068
1069         assertEquals(1, appendEntries.getLeaderCommit());
1070         assertEquals(0, appendEntries.getEntries().size());
1071         assertEquals(0, appendEntries.getPrevLogIndex());
1072
1073         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1074                 leaderActor, AppendEntriesReply.class);
1075
1076         assertEquals(2, appendEntriesReply.getLogLastIndex());
1077         assertEquals(1, appendEntriesReply.getLogLastTerm());
1078
1079         leaderActor.underlyingActor().setBehavior(follower);
1080         leader.handleMessage(followerActor, appendEntriesReply);
1081
1082         leaderActor.underlyingActor().clear();
1083         followerActor.underlyingActor().clear();
1084
1085         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1086                 TimeUnit.MILLISECONDS);
1087
1088         leader.handleMessage(leaderActor, new SendHeartBeat());
1089
1090         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1091
1092         assertEquals(2, appendEntries.getLeaderCommit());
1093         assertEquals(0, appendEntries.getEntries().size());
1094         assertEquals(2, appendEntries.getPrevLogIndex());
1095
1096         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1097
1098         assertEquals(2, appendEntriesReply.getLogLastIndex());
1099         assertEquals(1, appendEntriesReply.getLogLastTerm());
1100
1101         assertEquals(2, followerActorContext.getCommitIndex());
1102
1103         follower.close();
1104     }
1105
1106     @Test
1107     public void testHandleAppendEntriesReplyFailure(){
1108         logStart("testHandleAppendEntriesReplyFailure");
1109
1110         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1111
1112         leader = new Leader(leaderActorContext);
1113
1114         // Send initial heartbeat reply with last index.
1115         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1116
1117         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1118         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1119
1120         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1121
1122         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1123
1124         assertEquals(RaftState.Leader, raftActorBehavior.state());
1125
1126         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1127     }
1128
1129     @Test
1130     public void testHandleAppendEntriesReplySuccess() throws Exception {
1131         logStart("testHandleAppendEntriesReplySuccess");
1132
1133         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1134
1135         leaderActorContext.setReplicatedLog(
1136                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1137
1138         leaderActorContext.setCommitIndex(1);
1139         leaderActorContext.setLastApplied(1);
1140         leaderActorContext.getTermInformation().update(1, "leader");
1141
1142         leader = new Leader(leaderActorContext);
1143
1144         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1145
1146         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1147
1148         assertEquals(RaftState.Leader, raftActorBehavior.state());
1149
1150         assertEquals(2, leaderActorContext.getCommitIndex());
1151
1152         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1153                 leaderActor, ApplyJournalEntries.class);
1154
1155         assertEquals(2, leaderActorContext.getLastApplied());
1156
1157         assertEquals(2, applyJournalEntries.getToIndex());
1158
1159         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1160                 ApplyState.class);
1161
1162         assertEquals(1,applyStateList.size());
1163
1164         ApplyState applyState = applyStateList.get(0);
1165
1166         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1167     }
1168
1169     @Test
1170     public void testHandleAppendEntriesReplyUnknownFollower(){
1171         logStart("testHandleAppendEntriesReplyUnknownFollower");
1172
1173         MockRaftActorContext leaderActorContext = createActorContext();
1174
1175         leader = new Leader(leaderActorContext);
1176
1177         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1178
1179         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1180
1181         assertEquals(RaftState.Leader, raftActorBehavior.state());
1182     }
1183
1184     @Test
1185     public void testHandleRequestVoteReply(){
1186         logStart("testHandleRequestVoteReply");
1187
1188         MockRaftActorContext leaderActorContext = createActorContext();
1189
1190         leader = new Leader(leaderActorContext);
1191
1192         // Should be a no-op.
1193         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1194                 new RequestVoteReply(1, true));
1195
1196         assertEquals(RaftState.Leader, raftActorBehavior.state());
1197
1198         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1199
1200         assertEquals(RaftState.Leader, raftActorBehavior.state());
1201     }
1202
1203     @Test
1204     public void testIsolatedLeaderCheckNoFollowers() {
1205         logStart("testIsolatedLeaderCheckNoFollowers");
1206
1207         MockRaftActorContext leaderActorContext = createActorContext();
1208
1209         leader = new Leader(leaderActorContext);
1210         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1211         Assert.assertTrue(behavior instanceof Leader);
1212     }
1213
1214     @Test
1215     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1216         logStart("testIsolatedLeaderCheckTwoFollowers");
1217
1218         new JavaTestKit(getSystem()) {{
1219
1220             ActorRef followerActor1 = getTestActor();
1221             ActorRef followerActor2 = getTestActor();
1222
1223             MockRaftActorContext leaderActorContext = createActorContext();
1224
1225             Map<String, String> peerAddresses = new HashMap<>();
1226             peerAddresses.put("follower-1", followerActor1.path().toString());
1227             peerAddresses.put("follower-2", followerActor2.path().toString());
1228
1229             leaderActorContext.setPeerAddresses(peerAddresses);
1230
1231             leader = new Leader(leaderActorContext);
1232
1233             leader.markFollowerActive("follower-1");
1234             leader.markFollowerActive("follower-2");
1235             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1236             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1237                 behavior instanceof Leader);
1238
1239             // kill 1 follower and verify if that got killed
1240             final JavaTestKit probe = new JavaTestKit(getSystem());
1241             probe.watch(followerActor1);
1242             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1243             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1244             assertEquals(termMsg1.getActor(), followerActor1);
1245
1246             leader.markFollowerInActive("follower-1");
1247             leader.markFollowerActive("follower-2");
1248             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1249             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1250                 behavior instanceof Leader);
1251
1252             // kill 2nd follower and leader should change to Isolated leader
1253             followerActor2.tell(PoisonPill.getInstance(), null);
1254             probe.watch(followerActor2);
1255             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1256             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1257             assertEquals(termMsg2.getActor(), followerActor2);
1258
1259             leader.markFollowerInActive("follower-2");
1260             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1261             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1262                 behavior instanceof IsolatedLeader);
1263         }};
1264     }
1265
1266
1267     @Test
1268     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1269         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1270
1271         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1272
1273         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1274         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1275         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1276
1277         leaderActorContext.setConfigParams(configParams);
1278
1279         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1280
1281         followerActorContext.setConfigParams(configParams);
1282
1283         Follower follower = new Follower(followerActorContext);
1284         followerActor.underlyingActor().setBehavior(follower);
1285
1286         leaderActorContext.getReplicatedLog().removeFrom(0);
1287         leaderActorContext.setCommitIndex(-1);
1288         leaderActorContext.setLastApplied(-1);
1289
1290         followerActorContext.getReplicatedLog().removeFrom(0);
1291         followerActorContext.setCommitIndex(-1);
1292         followerActorContext.setLastApplied(-1);
1293
1294         leader = new Leader(leaderActorContext);
1295
1296         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1297                 leaderActor, AppendEntriesReply.class);
1298
1299         leader.handleMessage(followerActor, appendEntriesReply);
1300
1301         // Clear initial heartbeat messages
1302
1303         leaderActor.underlyingActor().clear();
1304         followerActor.underlyingActor().clear();
1305
1306         // create 3 entries
1307         leaderActorContext.setReplicatedLog(
1308                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1309         leaderActorContext.setCommitIndex(1);
1310         leaderActorContext.setLastApplied(1);
1311
1312         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1313                 TimeUnit.MILLISECONDS);
1314
1315         leader.handleMessage(leaderActor, new SendHeartBeat());
1316
1317         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1318
1319         // Should send first log entry
1320         assertEquals(1, appendEntries.getLeaderCommit());
1321         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1322         assertEquals(-1, appendEntries.getPrevLogIndex());
1323
1324         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1325
1326         assertEquals(1, appendEntriesReply.getLogLastTerm());
1327         assertEquals(0, appendEntriesReply.getLogLastIndex());
1328
1329         followerActor.underlyingActor().clear();
1330
1331         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1332
1333         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1334
1335         // Should send second log entry
1336         assertEquals(1, appendEntries.getLeaderCommit());
1337         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1338
1339         follower.close();
1340     }
1341
1342     @Test
1343     public void testLaggingFollowerStarvation() throws Exception {
1344         logStart("testLaggingFollowerStarvation");
1345         new JavaTestKit(getSystem()) {{
1346             String leaderActorId = actorFactory.generateActorId("leader");
1347             String follower1ActorId = actorFactory.generateActorId("follower");
1348             String follower2ActorId = actorFactory.generateActorId("follower");
1349
1350             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1351                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1352             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1353             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1354
1355             MockRaftActorContext leaderActorContext =
1356                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1357
1358             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1359             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1360             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1361
1362             leaderActorContext.setConfigParams(configParams);
1363
1364             leaderActorContext.setReplicatedLog(
1365                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1366
1367             Map<String, String> peerAddresses = new HashMap<>();
1368             peerAddresses.put(follower1ActorId,
1369                     follower1Actor.path().toString());
1370             peerAddresses.put(follower2ActorId,
1371                     follower2Actor.path().toString());
1372
1373             leaderActorContext.setPeerAddresses(peerAddresses);
1374             leaderActorContext.getTermInformation().update(1, leaderActorId);
1375
1376             RaftActorBehavior leader = createBehavior(leaderActorContext);
1377
1378             leaderActor.underlyingActor().setBehavior(leader);
1379
1380             for(int i=1;i<6;i++) {
1381                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1382                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1383                 assertTrue(newBehavior == leader);
1384                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1385             }
1386
1387             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1388             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1389
1390             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1391                     heartbeats.size() > 1);
1392
1393             // Check if follower-2 got AppendEntries during this time and was not starved
1394             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1395
1396             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1397                     appendEntries.size() > 1);
1398
1399         }};
1400     }
1401
1402     @Override
1403     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1404             ActorRef actorRef, RaftRPC rpc) throws Exception {
1405         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1406         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1407     }
1408
1409     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1410
1411         private final long electionTimeOutIntervalMillis;
1412         private final int snapshotChunkSize;
1413
1414         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1415             super();
1416             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1417             this.snapshotChunkSize = snapshotChunkSize;
1418         }
1419
1420         @Override
1421         public FiniteDuration getElectionTimeOutInterval() {
1422             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1423         }
1424
1425         @Override
1426         public int getSnapshotChunkSize() {
1427             return snapshotChunkSize;
1428         }
1429     }
1430 }