BUG 2773 : Transition Shard to Leader state when it has no peers
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51     public static final String LEADER_ID = "leader";
52
53     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55
56     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58
59     private Leader leader;
60
61     @Override
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         super.tearDown();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123
124     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
125         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
126         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
127                 1, index, payload);
128         actorContext.getReplicatedLog().append(newEntry);
129         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
130     }
131
132     @Test
133     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
134         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
135
136         MockRaftActorContext actorContext = createActorContextWithFollower();
137
138         long term = 1;
139         actorContext.getTermInformation().update(term, "");
140
141         leader = new Leader(actorContext);
142
143         // Leader will send an immediate heartbeat - ignore it.
144         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145
146         // The follower would normally reply - simulate that explicitly here.
147         long lastIndex = actorContext.getReplicatedLog().lastIndex();
148         leader.handleMessage(followerActor, new AppendEntriesReply(
149                 FOLLOWER_ID, term, true, lastIndex, term));
150         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
151
152         followerActor.underlyingActor().clear();
153
154         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156                 1, lastIndex + 1, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
159
160         // State should not change
161         assertTrue(raftBehavior instanceof Leader);
162
163         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166         assertEquals("Entries size", 1, appendEntries.getEntries().size());
167         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
170     }
171
172     @Test
173     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
174         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
175
176         MockRaftActorContext actorContext = createActorContextWithFollower();
177         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
178             @Override
179             public FiniteDuration getHeartBeatInterval() {
180                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
181             }
182         });
183
184         long term = 1;
185         actorContext.getTermInformation().update(term, "");
186
187         leader = new Leader(actorContext);
188
189         // Leader will send an immediate heartbeat - ignore it.
190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191
192         // The follower would normally reply - simulate that explicitly here.
193         long lastIndex = actorContext.getReplicatedLog().lastIndex();
194         leader.handleMessage(followerActor, new AppendEntriesReply(
195                 FOLLOWER_ID, term, true, lastIndex, term));
196         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197
198         followerActor.underlyingActor().clear();
199
200         for(int i=0;i<5;i++) {
201             sendReplicate(actorContext, lastIndex+i+1);
202         }
203
204         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
205         // We expect only 1 message to be sent because of two reasons,
206         // - an append entries reply was not received
207         // - the heartbeat interval has not expired
208         // In this scenario if multiple messages are sent they would likely be duplicates
209         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
210     }
211
212     @Test
213     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
214         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
215
216         MockRaftActorContext actorContext = createActorContextWithFollower();
217         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
218             @Override
219             public FiniteDuration getHeartBeatInterval() {
220                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
221             }
222         });
223
224         long term = 1;
225         actorContext.getTermInformation().update(term, "");
226
227         leader = new Leader(actorContext);
228
229         // Leader will send an immediate heartbeat - ignore it.
230         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
231
232         // The follower would normally reply - simulate that explicitly here.
233         long lastIndex = actorContext.getReplicatedLog().lastIndex();
234         leader.handleMessage(followerActor, new AppendEntriesReply(
235                 FOLLOWER_ID, term, true, lastIndex, term));
236         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
237
238         followerActor.underlyingActor().clear();
239
240         for(int i=0;i<3;i++) {
241             sendReplicate(actorContext, lastIndex+i+1);
242             leader.handleMessage(followerActor, new AppendEntriesReply(
243                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
244
245         }
246
247         for(int i=3;i<5;i++) {
248             sendReplicate(actorContext, lastIndex + i + 1);
249         }
250
251         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
252         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
253         // get sent to the follower - but not the 5th
254         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
255
256         for(int i=0;i<4;i++) {
257             long expected = allMessages.get(i).getEntries().get(0).getIndex();
258             assertEquals(expected, i+2);
259         }
260     }
261
262     @Test
263     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
264         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
265
266         MockRaftActorContext actorContext = createActorContextWithFollower();
267         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
268             @Override
269             public FiniteDuration getHeartBeatInterval() {
270                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
271             }
272         });
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         sendReplicate(actorContext, lastIndex+1);
291
292         // Wait slightly longer than heartbeat duration
293         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
294
295         leader.handleMessage(leaderActor, new SendHeartBeat());
296
297         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
298         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
299
300         assertEquals(1, allMessages.get(0).getEntries().size());
301         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
302         assertEquals(1, allMessages.get(1).getEntries().size());
303         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
304
305     }
306
307     @Test
308     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
309         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
310
311         MockRaftActorContext actorContext = createActorContextWithFollower();
312         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313             @Override
314             public FiniteDuration getHeartBeatInterval() {
315                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
316             }
317         });
318
319         long term = 1;
320         actorContext.getTermInformation().update(term, "");
321
322         leader = new Leader(actorContext);
323
324         // Leader will send an immediate heartbeat - ignore it.
325         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326
327         // The follower would normally reply - simulate that explicitly here.
328         long lastIndex = actorContext.getReplicatedLog().lastIndex();
329         leader.handleMessage(followerActor, new AppendEntriesReply(
330                 FOLLOWER_ID, term, true, lastIndex, term));
331         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332
333         followerActor.underlyingActor().clear();
334
335         for(int i=0;i<3;i++) {
336             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
337             leader.handleMessage(leaderActor, new SendHeartBeat());
338         }
339
340         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
342     }
343
344     @Test
345     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
346         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
347
348         MockRaftActorContext actorContext = createActorContextWithFollower();
349         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
350             @Override
351             public FiniteDuration getHeartBeatInterval() {
352                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
353             }
354         });
355
356         long term = 1;
357         actorContext.getTermInformation().update(term, "");
358
359         leader = new Leader(actorContext);
360
361         // Leader will send an immediate heartbeat - ignore it.
362         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
363
364         // The follower would normally reply - simulate that explicitly here.
365         long lastIndex = actorContext.getReplicatedLog().lastIndex();
366         leader.handleMessage(followerActor, new AppendEntriesReply(
367                 FOLLOWER_ID, term, true, lastIndex, term));
368         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
369
370         followerActor.underlyingActor().clear();
371
372         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
373         leader.handleMessage(leaderActor, new SendHeartBeat());
374         sendReplicate(actorContext, lastIndex+1);
375
376         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
378
379         assertEquals(0, allMessages.get(0).getEntries().size());
380         assertEquals(1, allMessages.get(1).getEntries().size());
381     }
382
383
384     @Test
385     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
386         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
387
388         MockRaftActorContext actorContext = createActorContext();
389
390         leader = new Leader(actorContext);
391
392         actorContext.setLastApplied(0);
393
394         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
395         long term = actorContext.getTermInformation().getCurrentTerm();
396         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
397                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
398
399         actorContext.getReplicatedLog().append(newEntry);
400
401         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
402                 new Replicate(leaderActor, "state-id", newEntry));
403
404         // State should not change
405         assertTrue(raftBehavior instanceof Leader);
406
407         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
408
409         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
410         // one since lastApplied state is 0.
411         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
412                 leaderActor, ApplyState.class);
413         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
414
415         for(int i = 0; i <= newLogIndex - 1; i++ ) {
416             ApplyState applyState = applyStateList.get(i);
417             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
418             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
419         }
420
421         ApplyState last = applyStateList.get((int) newLogIndex - 1);
422         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
423         assertEquals("getIdentifier", "state-id", last.getIdentifier());
424     }
425
426     @Test
427     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
428         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431
432         Map<String, String> leadersSnapshot = new HashMap<>();
433         leadersSnapshot.put("1", "A");
434         leadersSnapshot.put("2", "B");
435         leadersSnapshot.put("3", "C");
436
437         //clears leaders log
438         actorContext.getReplicatedLog().removeFrom(0);
439
440         final int followersLastIndex = 2;
441         final int snapshotIndex = 3;
442         final int newEntryIndex = 4;
443         final int snapshotTerm = 1;
444         final int currentTerm = 2;
445
446         // set the snapshot variables in replicatedlog
447         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
448         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
449         actorContext.setCommitIndex(followersLastIndex);
450         //set follower timeout to 2 mins, helps during debugging
451         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
452
453         leader = new Leader(actorContext);
454
455         // new entry
456         ReplicatedLogImplEntry entry =
457                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
458                         new MockRaftActorContext.MockPayload("D"));
459
460         //update follower timestamp
461         leader.markFollowerActive(FOLLOWER_ID);
462
463         ByteString bs = toByteString(leadersSnapshot);
464         leader.setSnapshot(Optional.of(bs));
465         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
466         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
467
468         //send first chunk and no InstallSnapshotReply received yet
469         fts.getNextChunk();
470         fts.incrementChunkIndex();
471
472         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
473                 TimeUnit.MILLISECONDS);
474
475         leader.handleMessage(leaderActor, new SendHeartBeat());
476
477         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478
479         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
480
481         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
482
483         //InstallSnapshotReply received
484         fts.markSendStatus(true);
485
486         leader.handleMessage(leaderActor, new SendHeartBeat());
487
488         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
489
490         assertEquals(snapshotIndex, is.getLastIncludedIndex());
491     }
492
493     @Test
494     public void testSendAppendEntriesSnapshotScenario() throws Exception {
495         logStart("testSendAppendEntriesSnapshotScenario");
496
497         MockRaftActorContext actorContext = createActorContextWithFollower();
498
499         Map<String, String> leadersSnapshot = new HashMap<>();
500         leadersSnapshot.put("1", "A");
501         leadersSnapshot.put("2", "B");
502         leadersSnapshot.put("3", "C");
503
504         //clears leaders log
505         actorContext.getReplicatedLog().removeFrom(0);
506
507         final int followersLastIndex = 2;
508         final int snapshotIndex = 3;
509         final int newEntryIndex = 4;
510         final int snapshotTerm = 1;
511         final int currentTerm = 2;
512
513         // set the snapshot variables in replicatedlog
514         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
515         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
516         actorContext.setCommitIndex(followersLastIndex);
517
518         leader = new Leader(actorContext);
519
520         // Leader will send an immediate heartbeat - ignore it.
521         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
522
523         // new entry
524         ReplicatedLogImplEntry entry =
525                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
526                         new MockRaftActorContext.MockPayload("D"));
527
528         //update follower timestamp
529         leader.markFollowerActive(FOLLOWER_ID);
530
531         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
532         RaftActorBehavior raftBehavior = leader.handleMessage(
533                 leaderActor, new Replicate(null, "state-id", entry));
534
535         assertTrue(raftBehavior instanceof Leader);
536
537         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
538     }
539
540     @Test
541     public void testInitiateInstallSnapshot() throws Exception {
542         logStart("testInitiateInstallSnapshot");
543
544         MockRaftActorContext actorContext = createActorContextWithFollower();
545
546         Map<String, String> leadersSnapshot = new HashMap<>();
547         leadersSnapshot.put("1", "A");
548         leadersSnapshot.put("2", "B");
549         leadersSnapshot.put("3", "C");
550
551         //clears leaders log
552         actorContext.getReplicatedLog().removeFrom(0);
553
554         final int followersLastIndex = 2;
555         final int snapshotIndex = 3;
556         final int newEntryIndex = 4;
557         final int snapshotTerm = 1;
558         final int currentTerm = 2;
559
560         // set the snapshot variables in replicatedlog
561         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
562         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
563         actorContext.setLastApplied(3);
564         actorContext.setCommitIndex(followersLastIndex);
565
566         leader = new Leader(actorContext);
567
568         // Leader will send an immediate heartbeat - ignore it.
569         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
570
571         // set the snapshot as absent and check if capture-snapshot is invoked.
572         leader.setSnapshot(Optional.<ByteString>absent());
573
574         // new entry
575         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
576                 new MockRaftActorContext.MockPayload("D"));
577
578         actorContext.getReplicatedLog().append(entry);
579
580         //update follower timestamp
581         leader.markFollowerActive(FOLLOWER_ID);
582
583         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
584
585         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
586
587         assertTrue(cs.isInstallSnapshotInitiated());
588         assertEquals(3, cs.getLastAppliedIndex());
589         assertEquals(1, cs.getLastAppliedTerm());
590         assertEquals(4, cs.getLastIndex());
591         assertEquals(2, cs.getLastTerm());
592
593         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
594         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
595
596         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
597         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
598     }
599
600     @Test
601     public void testInstallSnapshot() throws Exception {
602         logStart("testInstallSnapshot");
603
604         MockRaftActorContext actorContext = createActorContextWithFollower();
605
606         Map<String, String> leadersSnapshot = new HashMap<>();
607         leadersSnapshot.put("1", "A");
608         leadersSnapshot.put("2", "B");
609         leadersSnapshot.put("3", "C");
610
611         //clears leaders log
612         actorContext.getReplicatedLog().removeFrom(0);
613
614         final int followersLastIndex = 2;
615         final int snapshotIndex = 3;
616         final int snapshotTerm = 1;
617         final int currentTerm = 2;
618
619         // set the snapshot variables in replicatedlog
620         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
621         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
622         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
623         actorContext.setCommitIndex(followersLastIndex);
624
625         leader = new Leader(actorContext);
626
627         // Ignore initial heartbeat.
628         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
629
630         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
631                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
632
633         assertTrue(raftBehavior instanceof Leader);
634
635         // check if installsnapshot gets called with the correct values.
636
637         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
638
639         assertNotNull(installSnapshot.getData());
640         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
641         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
642
643         assertEquals(currentTerm, installSnapshot.getTerm());
644     }
645
646     @Test
647     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
648         logStart("testHandleInstallSnapshotReplyLastChunk");
649
650         MockRaftActorContext actorContext = createActorContextWithFollower();
651
652         final int followersLastIndex = 2;
653         final int snapshotIndex = 3;
654         final int snapshotTerm = 1;
655         final int currentTerm = 2;
656
657         actorContext.setCommitIndex(followersLastIndex);
658
659         leader = new Leader(actorContext);
660
661         // Ignore initial heartbeat.
662         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
663
664         Map<String, String> leadersSnapshot = new HashMap<>();
665         leadersSnapshot.put("1", "A");
666         leadersSnapshot.put("2", "B");
667         leadersSnapshot.put("3", "C");
668
669         // set the snapshot variables in replicatedlog
670
671         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
672         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
673         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
674
675         ByteString bs = toByteString(leadersSnapshot);
676         leader.setSnapshot(Optional.of(bs));
677         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
678         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
679         while(!fts.isLastChunk(fts.getChunkIndex())) {
680             fts.getNextChunk();
681             fts.incrementChunkIndex();
682         }
683
684         //clears leaders log
685         actorContext.getReplicatedLog().removeFrom(0);
686
687         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
688                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
689
690         assertTrue(raftBehavior instanceof Leader);
691
692         assertEquals(0, leader.followerSnapshotSize());
693         assertEquals(1, leader.followerLogSize());
694         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
695         assertNotNull(fli);
696         assertEquals(snapshotIndex, fli.getMatchIndex());
697         assertEquals(snapshotIndex, fli.getMatchIndex());
698         assertEquals(snapshotIndex + 1, fli.getNextIndex());
699     }
700
701     @Test
702     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
703         logStart("testSendSnapshotfromInstallSnapshotReply");
704
705         MockRaftActorContext actorContext = createActorContextWithFollower();
706
707         final int followersLastIndex = 2;
708         final int snapshotIndex = 3;
709         final int snapshotTerm = 1;
710         final int currentTerm = 2;
711
712         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
713             @Override
714             public int getSnapshotChunkSize() {
715                 return 50;
716             }
717         };
718         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
719         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
720
721         actorContext.setConfigParams(configParams);
722         actorContext.setCommitIndex(followersLastIndex);
723
724         leader = new Leader(actorContext);
725
726         Map<String, String> leadersSnapshot = new HashMap<>();
727         leadersSnapshot.put("1", "A");
728         leadersSnapshot.put("2", "B");
729         leadersSnapshot.put("3", "C");
730
731         // set the snapshot variables in replicatedlog
732         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
733         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
734         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
735
736         ByteString bs = toByteString(leadersSnapshot);
737         leader.setSnapshot(Optional.of(bs));
738
739         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
740
741         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
742
743         assertEquals(1, installSnapshot.getChunkIndex());
744         assertEquals(3, installSnapshot.getTotalChunks());
745
746         followerActor.underlyingActor().clear();
747         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
748                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
749
750         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
751
752         assertEquals(2, installSnapshot.getChunkIndex());
753         assertEquals(3, installSnapshot.getTotalChunks());
754
755         followerActor.underlyingActor().clear();
756         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
757                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
758
759         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
760
761         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
762         followerActor.underlyingActor().clear();
763         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
764                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
765
766         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
767
768         Assert.assertNull(installSnapshot);
769     }
770
771
772     @Test
773     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
774         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
775
776         MockRaftActorContext actorContext = createActorContextWithFollower();
777
778         final int followersLastIndex = 2;
779         final int snapshotIndex = 3;
780         final int snapshotTerm = 1;
781         final int currentTerm = 2;
782
783         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
784             @Override
785             public int getSnapshotChunkSize() {
786                 return 50;
787             }
788         });
789
790         actorContext.setCommitIndex(followersLastIndex);
791
792         leader = new Leader(actorContext);
793
794         Map<String, String> leadersSnapshot = new HashMap<>();
795         leadersSnapshot.put("1", "A");
796         leadersSnapshot.put("2", "B");
797         leadersSnapshot.put("3", "C");
798
799         // set the snapshot variables in replicatedlog
800         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
801         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
802         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
803
804         ByteString bs = toByteString(leadersSnapshot);
805         leader.setSnapshot(Optional.of(bs));
806
807         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
808         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
809
810         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
811
812         assertEquals(1, installSnapshot.getChunkIndex());
813         assertEquals(3, installSnapshot.getTotalChunks());
814
815         followerActor.underlyingActor().clear();
816
817         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
818                 FOLLOWER_ID, -1, false));
819
820         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
821                 TimeUnit.MILLISECONDS);
822
823         leader.handleMessage(leaderActor, new SendHeartBeat());
824
825         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
826
827         assertEquals(1, installSnapshot.getChunkIndex());
828         assertEquals(3, installSnapshot.getTotalChunks());
829     }
830
831     @Test
832     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
833         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
834
835         MockRaftActorContext actorContext = createActorContextWithFollower();
836
837         final int followersLastIndex = 2;
838         final int snapshotIndex = 3;
839         final int snapshotTerm = 1;
840         final int currentTerm = 2;
841
842         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
843             @Override
844             public int getSnapshotChunkSize() {
845                 return 50;
846             }
847         });
848
849         actorContext.setCommitIndex(followersLastIndex);
850
851         leader = new Leader(actorContext);
852
853         Map<String, String> leadersSnapshot = new HashMap<>();
854         leadersSnapshot.put("1", "A");
855         leadersSnapshot.put("2", "B");
856         leadersSnapshot.put("3", "C");
857
858         // set the snapshot variables in replicatedlog
859         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
860         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
861         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
862
863         ByteString bs = toByteString(leadersSnapshot);
864         leader.setSnapshot(Optional.of(bs));
865
866         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
867
868         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
869
870         assertEquals(1, installSnapshot.getChunkIndex());
871         assertEquals(3, installSnapshot.getTotalChunks());
872         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
873
874         int hashCode = installSnapshot.getData().hashCode();
875
876         followerActor.underlyingActor().clear();
877
878         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
879                 FOLLOWER_ID, 1, true));
880
881         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
882
883         assertEquals(2, installSnapshot.getChunkIndex());
884         assertEquals(3, installSnapshot.getTotalChunks());
885         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
886     }
887
888     @Test
889     public void testFollowerToSnapshotLogic() {
890         logStart("testFollowerToSnapshotLogic");
891
892         MockRaftActorContext actorContext = createActorContext();
893
894         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
895             @Override
896             public int getSnapshotChunkSize() {
897                 return 50;
898             }
899         });
900
901         leader = new Leader(actorContext);
902
903         Map<String, String> leadersSnapshot = new HashMap<>();
904         leadersSnapshot.put("1", "A");
905         leadersSnapshot.put("2", "B");
906         leadersSnapshot.put("3", "C");
907
908         ByteString bs = toByteString(leadersSnapshot);
909         byte[] barray = bs.toByteArray();
910
911         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
912         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
913
914         assertEquals(bs.size(), barray.length);
915
916         int chunkIndex=0;
917         for (int i=0; i < barray.length; i = i + 50) {
918             int j = i + 50;
919             chunkIndex++;
920
921             if (i + 50 > barray.length) {
922                 j = barray.length;
923             }
924
925             ByteString chunk = fts.getNextChunk();
926             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
927             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
928
929             fts.markSendStatus(true);
930             if (!fts.isLastChunk(chunkIndex)) {
931                 fts.incrementChunkIndex();
932             }
933         }
934
935         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
936     }
937
938     @Override protected RaftActorBehavior createBehavior(
939         RaftActorContext actorContext) {
940         return new Leader(actorContext);
941     }
942
943     @Override
944     protected MockRaftActorContext createActorContext() {
945         return createActorContext(leaderActor);
946     }
947
948     @Override
949     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
950         return createActorContext(LEADER_ID, actorRef);
951     }
952
953     private MockRaftActorContext createActorContextWithFollower() {
954         MockRaftActorContext actorContext = createActorContext();
955         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
956                 followerActor.path().toString()).build());
957         return actorContext;
958     }
959
960     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
961         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
962         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
963         configParams.setElectionTimeoutFactor(100000);
964         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
965         context.setConfigParams(configParams);
966         return context;
967     }
968
969     @Test
970     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
971         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
972
973         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
974
975         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
976
977         Follower follower = new Follower(followerActorContext);
978         followerActor.underlyingActor().setBehavior(follower);
979
980         Map<String, String> peerAddresses = new HashMap<>();
981         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
982
983         leaderActorContext.setPeerAddresses(peerAddresses);
984
985         leaderActorContext.getReplicatedLog().removeFrom(0);
986
987         //create 3 entries
988         leaderActorContext.setReplicatedLog(
989                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
990
991         leaderActorContext.setCommitIndex(1);
992
993         followerActorContext.getReplicatedLog().removeFrom(0);
994
995         // follower too has the exact same log entries and has the same commit index
996         followerActorContext.setReplicatedLog(
997                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
998
999         followerActorContext.setCommitIndex(1);
1000
1001         leader = new Leader(leaderActorContext);
1002
1003         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1004
1005         assertEquals(1, appendEntries.getLeaderCommit());
1006         assertEquals(0, appendEntries.getEntries().size());
1007         assertEquals(0, appendEntries.getPrevLogIndex());
1008
1009         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1010                 leaderActor, AppendEntriesReply.class);
1011
1012         assertEquals(2, appendEntriesReply.getLogLastIndex());
1013         assertEquals(1, appendEntriesReply.getLogLastTerm());
1014
1015         // follower returns its next index
1016         assertEquals(2, appendEntriesReply.getLogLastIndex());
1017         assertEquals(1, appendEntriesReply.getLogLastTerm());
1018
1019         follower.close();
1020     }
1021
1022     @Test
1023     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1024         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1025
1026         MockRaftActorContext leaderActorContext = createActorContext();
1027
1028         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1029         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1030
1031         Follower follower = new Follower(followerActorContext);
1032         followerActor.underlyingActor().setBehavior(follower);
1033
1034         Map<String, String> leaderPeerAddresses = new HashMap<>();
1035         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1036
1037         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1038
1039         leaderActorContext.getReplicatedLog().removeFrom(0);
1040
1041         leaderActorContext.setReplicatedLog(
1042                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1043
1044         leaderActorContext.setCommitIndex(1);
1045
1046         followerActorContext.getReplicatedLog().removeFrom(0);
1047
1048         followerActorContext.setReplicatedLog(
1049                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1050
1051         // follower has the same log entries but its commit index > leaders commit index
1052         followerActorContext.setCommitIndex(2);
1053
1054         leader = new Leader(leaderActorContext);
1055
1056         // Initial heartbeat
1057         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1058
1059         assertEquals(1, appendEntries.getLeaderCommit());
1060         assertEquals(0, appendEntries.getEntries().size());
1061         assertEquals(0, appendEntries.getPrevLogIndex());
1062
1063         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1064                 leaderActor, AppendEntriesReply.class);
1065
1066         assertEquals(2, appendEntriesReply.getLogLastIndex());
1067         assertEquals(1, appendEntriesReply.getLogLastTerm());
1068
1069         leaderActor.underlyingActor().setBehavior(follower);
1070         leader.handleMessage(followerActor, appendEntriesReply);
1071
1072         leaderActor.underlyingActor().clear();
1073         followerActor.underlyingActor().clear();
1074
1075         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1076                 TimeUnit.MILLISECONDS);
1077
1078         leader.handleMessage(leaderActor, new SendHeartBeat());
1079
1080         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1081
1082         assertEquals(2, appendEntries.getLeaderCommit());
1083         assertEquals(0, appendEntries.getEntries().size());
1084         assertEquals(2, appendEntries.getPrevLogIndex());
1085
1086         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1087
1088         assertEquals(2, appendEntriesReply.getLogLastIndex());
1089         assertEquals(1, appendEntriesReply.getLogLastTerm());
1090
1091         assertEquals(2, followerActorContext.getCommitIndex());
1092
1093         follower.close();
1094     }
1095
1096     @Test
1097     public void testHandleAppendEntriesReplyFailure(){
1098         logStart("testHandleAppendEntriesReplyFailure");
1099
1100         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1101
1102         leader = new Leader(leaderActorContext);
1103
1104         // Send initial heartbeat reply with last index.
1105         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1106
1107         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1108         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1109
1110         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1111
1112         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1113
1114         assertEquals(RaftState.Leader, raftActorBehavior.state());
1115
1116         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1117     }
1118
1119     @Test
1120     public void testHandleAppendEntriesReplySuccess() throws Exception {
1121         logStart("testHandleAppendEntriesReplySuccess");
1122
1123         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1124
1125         leaderActorContext.setReplicatedLog(
1126                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1127
1128         leaderActorContext.setCommitIndex(1);
1129         leaderActorContext.setLastApplied(1);
1130         leaderActorContext.getTermInformation().update(1, "leader");
1131
1132         leader = new Leader(leaderActorContext);
1133
1134         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1135
1136         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1137
1138         assertEquals(RaftState.Leader, raftActorBehavior.state());
1139
1140         assertEquals(2, leaderActorContext.getCommitIndex());
1141
1142         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1143                 leaderActor, ApplyJournalEntries.class);
1144
1145         assertEquals(2, leaderActorContext.getLastApplied());
1146
1147         assertEquals(2, applyJournalEntries.getToIndex());
1148
1149         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1150                 ApplyState.class);
1151
1152         assertEquals(1,applyStateList.size());
1153
1154         ApplyState applyState = applyStateList.get(0);
1155
1156         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1157     }
1158
1159     @Test
1160     public void testHandleAppendEntriesReplyUnknownFollower(){
1161         logStart("testHandleAppendEntriesReplyUnknownFollower");
1162
1163         MockRaftActorContext leaderActorContext = createActorContext();
1164
1165         leader = new Leader(leaderActorContext);
1166
1167         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1168
1169         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1170
1171         assertEquals(RaftState.Leader, raftActorBehavior.state());
1172     }
1173
1174     @Test
1175     public void testHandleRequestVoteReply(){
1176         logStart("testHandleRequestVoteReply");
1177
1178         MockRaftActorContext leaderActorContext = createActorContext();
1179
1180         leader = new Leader(leaderActorContext);
1181
1182         // Should be a no-op.
1183         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1184                 new RequestVoteReply(1, true));
1185
1186         assertEquals(RaftState.Leader, raftActorBehavior.state());
1187
1188         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1189
1190         assertEquals(RaftState.Leader, raftActorBehavior.state());
1191     }
1192
1193     @Test
1194     public void testIsolatedLeaderCheckNoFollowers() {
1195         logStart("testIsolatedLeaderCheckNoFollowers");
1196
1197         MockRaftActorContext leaderActorContext = createActorContext();
1198
1199         leader = new Leader(leaderActorContext);
1200         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1201         Assert.assertTrue(behavior instanceof Leader);
1202     }
1203
1204     @Test
1205     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1206         logStart("testIsolatedLeaderCheckTwoFollowers");
1207
1208         new JavaTestKit(getSystem()) {{
1209
1210             ActorRef followerActor1 = getTestActor();
1211             ActorRef followerActor2 = getTestActor();
1212
1213             MockRaftActorContext leaderActorContext = createActorContext();
1214
1215             Map<String, String> peerAddresses = new HashMap<>();
1216             peerAddresses.put("follower-1", followerActor1.path().toString());
1217             peerAddresses.put("follower-2", followerActor2.path().toString());
1218
1219             leaderActorContext.setPeerAddresses(peerAddresses);
1220
1221             leader = new Leader(leaderActorContext);
1222
1223             leader.markFollowerActive("follower-1");
1224             leader.markFollowerActive("follower-2");
1225             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1226             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1227                 behavior instanceof Leader);
1228
1229             // kill 1 follower and verify if that got killed
1230             final JavaTestKit probe = new JavaTestKit(getSystem());
1231             probe.watch(followerActor1);
1232             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1233             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1234             assertEquals(termMsg1.getActor(), followerActor1);
1235
1236             leader.markFollowerInActive("follower-1");
1237             leader.markFollowerActive("follower-2");
1238             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1239             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1240                 behavior instanceof Leader);
1241
1242             // kill 2nd follower and leader should change to Isolated leader
1243             followerActor2.tell(PoisonPill.getInstance(), null);
1244             probe.watch(followerActor2);
1245             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1246             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1247             assertEquals(termMsg2.getActor(), followerActor2);
1248
1249             leader.markFollowerInActive("follower-2");
1250             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1251             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1252                 behavior instanceof IsolatedLeader);
1253         }};
1254     }
1255
1256
1257     @Test
1258     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1259         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1260
1261         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1262
1263         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1264         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1265         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1266
1267         leaderActorContext.setConfigParams(configParams);
1268
1269         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1270
1271         followerActorContext.setConfigParams(configParams);
1272         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1273
1274         Follower follower = new Follower(followerActorContext);
1275         followerActor.underlyingActor().setBehavior(follower);
1276
1277         leaderActorContext.getReplicatedLog().removeFrom(0);
1278         leaderActorContext.setCommitIndex(-1);
1279         leaderActorContext.setLastApplied(-1);
1280
1281         followerActorContext.getReplicatedLog().removeFrom(0);
1282         followerActorContext.setCommitIndex(-1);
1283         followerActorContext.setLastApplied(-1);
1284
1285         leader = new Leader(leaderActorContext);
1286
1287         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1288                 leaderActor, AppendEntriesReply.class);
1289
1290         leader.handleMessage(followerActor, appendEntriesReply);
1291
1292         // Clear initial heartbeat messages
1293
1294         leaderActor.underlyingActor().clear();
1295         followerActor.underlyingActor().clear();
1296
1297         // create 3 entries
1298         leaderActorContext.setReplicatedLog(
1299                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1300         leaderActorContext.setCommitIndex(1);
1301         leaderActorContext.setLastApplied(1);
1302
1303         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1304                 TimeUnit.MILLISECONDS);
1305
1306         leader.handleMessage(leaderActor, new SendHeartBeat());
1307
1308         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1309
1310         // Should send first log entry
1311         assertEquals(1, appendEntries.getLeaderCommit());
1312         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1313         assertEquals(-1, appendEntries.getPrevLogIndex());
1314
1315         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1316
1317         assertEquals(1, appendEntriesReply.getLogLastTerm());
1318         assertEquals(0, appendEntriesReply.getLogLastIndex());
1319
1320         followerActor.underlyingActor().clear();
1321
1322         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1323
1324         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1325
1326         // Should send second log entry
1327         assertEquals(1, appendEntries.getLeaderCommit());
1328         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1329
1330         follower.close();
1331     }
1332
1333     @Test
1334     public void testLaggingFollowerStarvation() throws Exception {
1335         logStart("testLaggingFollowerStarvation");
1336         new JavaTestKit(getSystem()) {{
1337             String leaderActorId = actorFactory.generateActorId("leader");
1338             String follower1ActorId = actorFactory.generateActorId("follower");
1339             String follower2ActorId = actorFactory.generateActorId("follower");
1340
1341             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1342                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1343             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1344             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1345
1346             MockRaftActorContext leaderActorContext =
1347                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1348
1349             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1350             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1351             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1352
1353             leaderActorContext.setConfigParams(configParams);
1354
1355             leaderActorContext.setReplicatedLog(
1356                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1357
1358             Map<String, String> peerAddresses = new HashMap<>();
1359             peerAddresses.put(follower1ActorId,
1360                     follower1Actor.path().toString());
1361             peerAddresses.put(follower2ActorId,
1362                     follower2Actor.path().toString());
1363
1364             leaderActorContext.setPeerAddresses(peerAddresses);
1365             leaderActorContext.getTermInformation().update(1, leaderActorId);
1366
1367             RaftActorBehavior leader = createBehavior(leaderActorContext);
1368
1369             leaderActor.underlyingActor().setBehavior(leader);
1370
1371             for(int i=1;i<6;i++) {
1372                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1373                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1374                 assertTrue(newBehavior == leader);
1375                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1376             }
1377
1378             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1379             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1380
1381             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1382                     heartbeats.size() > 1);
1383
1384             // Check if follower-2 got AppendEntries during this time and was not starved
1385             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1386
1387             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1388                     appendEntries.size() > 1);
1389
1390         }};
1391     }
1392
1393     @Override
1394     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1395             ActorRef actorRef, RaftRPC rpc) throws Exception {
1396         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1397         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1398     }
1399
1400     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1401
1402         private final long electionTimeOutIntervalMillis;
1403         private final int snapshotChunkSize;
1404
1405         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1406             super();
1407             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1408             this.snapshotChunkSize = snapshotChunkSize;
1409         }
1410
1411         @Override
1412         public FiniteDuration getElectionTimeOutInterval() {
1413             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1414         }
1415
1416         @Override
1417         public int getSnapshotChunkSize() {
1418             return snapshotChunkSize;
1419         }
1420     }
1421 }