Merge "Improve performance of XmlElement.getName"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51     public static final String LEADER_ID = "leader";
52
53     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55
56     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58
59     private Leader leader;
60
61     @Override
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         super.tearDown();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88
89         long term = 1;
90         actorContext.getTermInformation().update(term, "");
91
92         leader = new Leader(actorContext);
93
94         // Leader should send an immediate heartbeat with no entries as follower is inactive.
95         long lastIndex = actorContext.getReplicatedLog().lastIndex();
96         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
97         assertEquals("getTerm", term, appendEntries.getTerm());
98         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
99         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
100         assertEquals("Entries size", 0, appendEntries.getEntries().size());
101
102         // The follower would normally reply - simulate that explicitly here.
103         leader.handleMessage(followerActor, new AppendEntriesReply(
104                 FOLLOWER_ID, term, true, lastIndex - 1, term));
105         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
106
107         followerActor.underlyingActor().clear();
108
109         // Sleep for the heartbeat interval so AppendEntries is sent.
110         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
111                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
112
113         leader.handleMessage(leaderActor, new SendHeartBeat());
114
115         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
116         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
117         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
118         assertEquals("Entries size", 1, appendEntries.getEntries().size());
119         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
120         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
121     }
122
123
124     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
125         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
126         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
127                 1, index, payload);
128         actorContext.getReplicatedLog().append(newEntry);
129         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
130     }
131
132     @Test
133     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
134         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
135
136         MockRaftActorContext actorContext = createActorContextWithFollower();
137
138         long term = 1;
139         actorContext.getTermInformation().update(term, "");
140
141         leader = new Leader(actorContext);
142
143         // Leader will send an immediate heartbeat - ignore it.
144         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
145
146         // The follower would normally reply - simulate that explicitly here.
147         long lastIndex = actorContext.getReplicatedLog().lastIndex();
148         leader.handleMessage(followerActor, new AppendEntriesReply(
149                 FOLLOWER_ID, term, true, lastIndex, term));
150         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
151
152         followerActor.underlyingActor().clear();
153
154         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
155         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
156                 1, lastIndex + 1, payload);
157         actorContext.getReplicatedLog().append(newEntry);
158         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
159
160         // State should not change
161         assertTrue(raftBehavior instanceof Leader);
162
163         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
164         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
165         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
166         assertEquals("Entries size", 1, appendEntries.getEntries().size());
167         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
168         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
169         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
170     }
171
172     @Test
173     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
174         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
175
176         MockRaftActorContext actorContext = createActorContextWithFollower();
177         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
178             @Override
179             public FiniteDuration getHeartBeatInterval() {
180                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
181             }
182         });
183
184         long term = 1;
185         actorContext.getTermInformation().update(term, "");
186
187         leader = new Leader(actorContext);
188
189         // Leader will send an immediate heartbeat - ignore it.
190         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
191
192         // The follower would normally reply - simulate that explicitly here.
193         long lastIndex = actorContext.getReplicatedLog().lastIndex();
194         leader.handleMessage(followerActor, new AppendEntriesReply(
195                 FOLLOWER_ID, term, true, lastIndex, term));
196         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
197
198         followerActor.underlyingActor().clear();
199
200         for(int i=0;i<5;i++) {
201             sendReplicate(actorContext, lastIndex+i+1);
202         }
203
204         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
205         // We expect only 1 message to be sent because of two reasons,
206         // - an append entries reply was not received
207         // - the heartbeat interval has not expired
208         // In this scenario if multiple messages are sent they would likely be duplicates
209         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
210     }
211
212     @Test
213     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
214         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
215
216         MockRaftActorContext actorContext = createActorContextWithFollower();
217         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
218             @Override
219             public FiniteDuration getHeartBeatInterval() {
220                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
221             }
222         });
223
224         long term = 1;
225         actorContext.getTermInformation().update(term, "");
226
227         leader = new Leader(actorContext);
228
229         // Leader will send an immediate heartbeat - ignore it.
230         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
231
232         // The follower would normally reply - simulate that explicitly here.
233         long lastIndex = actorContext.getReplicatedLog().lastIndex();
234         leader.handleMessage(followerActor, new AppendEntriesReply(
235                 FOLLOWER_ID, term, true, lastIndex, term));
236         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
237
238         followerActor.underlyingActor().clear();
239
240         for(int i=0;i<3;i++) {
241             sendReplicate(actorContext, lastIndex+i+1);
242             leader.handleMessage(followerActor, new AppendEntriesReply(
243                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
244
245         }
246
247         for(int i=3;i<5;i++) {
248             sendReplicate(actorContext, lastIndex + i + 1);
249         }
250
251         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
252         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
253         // get sent to the follower - but not the 5th
254         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
255
256         for(int i=0;i<4;i++) {
257             long expected = allMessages.get(i).getEntries().get(0).getIndex();
258             assertEquals(expected, i+2);
259         }
260     }
261
262     @Test
263     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
264         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
265
266         MockRaftActorContext actorContext = createActorContextWithFollower();
267         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
268             @Override
269             public FiniteDuration getHeartBeatInterval() {
270                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
271             }
272         });
273
274         long term = 1;
275         actorContext.getTermInformation().update(term, "");
276
277         leader = new Leader(actorContext);
278
279         // Leader will send an immediate heartbeat - ignore it.
280         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
281
282         // The follower would normally reply - simulate that explicitly here.
283         long lastIndex = actorContext.getReplicatedLog().lastIndex();
284         leader.handleMessage(followerActor, new AppendEntriesReply(
285                 FOLLOWER_ID, term, true, lastIndex, term));
286         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
287
288         followerActor.underlyingActor().clear();
289
290         sendReplicate(actorContext, lastIndex+1);
291
292         // Wait slightly longer than heartbeat duration
293         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
294
295         leader.handleMessage(leaderActor, new SendHeartBeat());
296
297         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
298         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
299
300         assertEquals(1, allMessages.get(0).getEntries().size());
301         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
302         assertEquals(1, allMessages.get(1).getEntries().size());
303         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
304
305     }
306
307     @Test
308     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
309         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
310
311         MockRaftActorContext actorContext = createActorContextWithFollower();
312         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
313             @Override
314             public FiniteDuration getHeartBeatInterval() {
315                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
316             }
317         });
318
319         long term = 1;
320         actorContext.getTermInformation().update(term, "");
321
322         leader = new Leader(actorContext);
323
324         // Leader will send an immediate heartbeat - ignore it.
325         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
326
327         // The follower would normally reply - simulate that explicitly here.
328         long lastIndex = actorContext.getReplicatedLog().lastIndex();
329         leader.handleMessage(followerActor, new AppendEntriesReply(
330                 FOLLOWER_ID, term, true, lastIndex, term));
331         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
332
333         followerActor.underlyingActor().clear();
334
335         for(int i=0;i<3;i++) {
336             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
337             leader.handleMessage(leaderActor, new SendHeartBeat());
338         }
339
340         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
341         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
342     }
343
344     @Test
345     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
346         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
347
348         MockRaftActorContext actorContext = createActorContextWithFollower();
349         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
350             @Override
351             public FiniteDuration getHeartBeatInterval() {
352                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
353             }
354         });
355
356         long term = 1;
357         actorContext.getTermInformation().update(term, "");
358
359         leader = new Leader(actorContext);
360
361         // Leader will send an immediate heartbeat - ignore it.
362         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
363
364         // The follower would normally reply - simulate that explicitly here.
365         long lastIndex = actorContext.getReplicatedLog().lastIndex();
366         leader.handleMessage(followerActor, new AppendEntriesReply(
367                 FOLLOWER_ID, term, true, lastIndex, term));
368         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
369
370         followerActor.underlyingActor().clear();
371
372         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
373         leader.handleMessage(leaderActor, new SendHeartBeat());
374         sendReplicate(actorContext, lastIndex+1);
375
376         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
377         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
378
379         assertEquals(0, allMessages.get(0).getEntries().size());
380         assertEquals(1, allMessages.get(1).getEntries().size());
381     }
382
383
384     @Test
385     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
386         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
387
388         MockRaftActorContext actorContext = createActorContext();
389
390         leader = new Leader(actorContext);
391
392         actorContext.setLastApplied(0);
393
394         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
395         long term = actorContext.getTermInformation().getCurrentTerm();
396         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
397                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
398
399         actorContext.getReplicatedLog().append(newEntry);
400
401         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
402                 new Replicate(leaderActor, "state-id", newEntry));
403
404         // State should not change
405         assertTrue(raftBehavior instanceof Leader);
406
407         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
408
409         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
410         // one since lastApplied state is 0.
411         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
412                 leaderActor, ApplyState.class);
413         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
414
415         for(int i = 0; i <= newLogIndex - 1; i++ ) {
416             ApplyState applyState = applyStateList.get(i);
417             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
418             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
419         }
420
421         ApplyState last = applyStateList.get((int) newLogIndex - 1);
422         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
423         assertEquals("getIdentifier", "state-id", last.getIdentifier());
424     }
425
426     @Test
427     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
428         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
429
430         MockRaftActorContext actorContext = createActorContextWithFollower();
431
432         Map<String, String> leadersSnapshot = new HashMap<>();
433         leadersSnapshot.put("1", "A");
434         leadersSnapshot.put("2", "B");
435         leadersSnapshot.put("3", "C");
436
437         //clears leaders log
438         actorContext.getReplicatedLog().removeFrom(0);
439
440         final int followersLastIndex = 2;
441         final int snapshotIndex = 3;
442         final int newEntryIndex = 4;
443         final int snapshotTerm = 1;
444         final int currentTerm = 2;
445
446         // set the snapshot variables in replicatedlog
447         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
448         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
449         actorContext.setCommitIndex(followersLastIndex);
450         //set follower timeout to 2 mins, helps during debugging
451         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
452
453         leader = new Leader(actorContext);
454
455         // new entry
456         ReplicatedLogImplEntry entry =
457                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
458                         new MockRaftActorContext.MockPayload("D"));
459
460         //update follower timestamp
461         leader.markFollowerActive(FOLLOWER_ID);
462
463         ByteString bs = toByteString(leadersSnapshot);
464         leader.setSnapshot(Optional.of(bs));
465         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
466         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
467
468         //send first chunk and no InstallSnapshotReply received yet
469         fts.getNextChunk();
470         fts.incrementChunkIndex();
471
472         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
473                 TimeUnit.MILLISECONDS);
474
475         leader.handleMessage(leaderActor, new SendHeartBeat());
476
477         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
478
479         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
480
481         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
482
483         //InstallSnapshotReply received
484         fts.markSendStatus(true);
485
486         leader.handleMessage(leaderActor, new SendHeartBeat());
487
488         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
489
490         assertEquals(snapshotIndex, is.getLastIncludedIndex());
491     }
492
493     @Test
494     public void testSendAppendEntriesSnapshotScenario() throws Exception {
495         logStart("testSendAppendEntriesSnapshotScenario");
496
497         MockRaftActorContext actorContext = createActorContextWithFollower();
498
499         Map<String, String> leadersSnapshot = new HashMap<>();
500         leadersSnapshot.put("1", "A");
501         leadersSnapshot.put("2", "B");
502         leadersSnapshot.put("3", "C");
503
504         //clears leaders log
505         actorContext.getReplicatedLog().removeFrom(0);
506
507         final int followersLastIndex = 2;
508         final int snapshotIndex = 3;
509         final int newEntryIndex = 4;
510         final int snapshotTerm = 1;
511         final int currentTerm = 2;
512
513         // set the snapshot variables in replicatedlog
514         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
515         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
516         actorContext.setCommitIndex(followersLastIndex);
517
518         leader = new Leader(actorContext);
519
520         // Leader will send an immediate heartbeat - ignore it.
521         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
522
523         // new entry
524         ReplicatedLogImplEntry entry =
525                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
526                         new MockRaftActorContext.MockPayload("D"));
527
528         actorContext.getReplicatedLog().append(entry);
529
530         //update follower timestamp
531         leader.markFollowerActive(FOLLOWER_ID);
532
533         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
534         RaftActorBehavior raftBehavior = leader.handleMessage(
535                 leaderActor, new Replicate(null, "state-id", entry));
536
537         assertTrue(raftBehavior instanceof Leader);
538
539         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
540     }
541
542     @Test
543     public void testInitiateInstallSnapshot() throws Exception {
544         logStart("testInitiateInstallSnapshot");
545
546         MockRaftActorContext actorContext = createActorContextWithFollower();
547
548         Map<String, String> leadersSnapshot = new HashMap<>();
549         leadersSnapshot.put("1", "A");
550         leadersSnapshot.put("2", "B");
551         leadersSnapshot.put("3", "C");
552
553         //clears leaders log
554         actorContext.getReplicatedLog().removeFrom(0);
555
556         final int followersLastIndex = 2;
557         final int snapshotIndex = 3;
558         final int newEntryIndex = 4;
559         final int snapshotTerm = 1;
560         final int currentTerm = 2;
561
562         // set the snapshot variables in replicatedlog
563         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
564         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
565         actorContext.setLastApplied(3);
566         actorContext.setCommitIndex(followersLastIndex);
567
568         leader = new Leader(actorContext);
569
570         // Leader will send an immediate heartbeat - ignore it.
571         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
572
573         // set the snapshot as absent and check if capture-snapshot is invoked.
574         leader.setSnapshot(Optional.<ByteString>absent());
575
576         // new entry
577         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
578                 new MockRaftActorContext.MockPayload("D"));
579
580         actorContext.getReplicatedLog().append(entry);
581
582         //update follower timestamp
583         leader.markFollowerActive(FOLLOWER_ID);
584
585         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
586
587         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
588
589         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
590
591         assertTrue(cs.isInstallSnapshotInitiated());
592         assertEquals(3, cs.getLastAppliedIndex());
593         assertEquals(1, cs.getLastAppliedTerm());
594         assertEquals(4, cs.getLastIndex());
595         assertEquals(2, cs.getLastTerm());
596
597         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
598         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
599
600         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
601     }
602
603     @Test
604     public void testInstallSnapshot() throws Exception {
605         logStart("testInstallSnapshot");
606
607         MockRaftActorContext actorContext = createActorContextWithFollower();
608
609         Map<String, String> leadersSnapshot = new HashMap<>();
610         leadersSnapshot.put("1", "A");
611         leadersSnapshot.put("2", "B");
612         leadersSnapshot.put("3", "C");
613
614         //clears leaders log
615         actorContext.getReplicatedLog().removeFrom(0);
616
617         final int followersLastIndex = 2;
618         final int snapshotIndex = 3;
619         final int snapshotTerm = 1;
620         final int currentTerm = 2;
621
622         // set the snapshot variables in replicatedlog
623         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
624         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
625         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
626         actorContext.setCommitIndex(followersLastIndex);
627
628         leader = new Leader(actorContext);
629
630         // Ignore initial heartbeat.
631         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
632
633         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
634                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
635
636         assertTrue(raftBehavior instanceof Leader);
637
638         // check if installsnapshot gets called with the correct values.
639
640         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
641
642         assertNotNull(installSnapshot.getData());
643         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
644         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
645
646         assertEquals(currentTerm, installSnapshot.getTerm());
647     }
648
649     @Test
650     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
651         logStart("testHandleInstallSnapshotReplyLastChunk");
652
653         MockRaftActorContext actorContext = createActorContextWithFollower();
654
655         final int followersLastIndex = 2;
656         final int snapshotIndex = 3;
657         final int snapshotTerm = 1;
658         final int currentTerm = 2;
659
660         actorContext.setCommitIndex(followersLastIndex);
661
662         leader = new Leader(actorContext);
663
664         // Ignore initial heartbeat.
665         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
666
667         Map<String, String> leadersSnapshot = new HashMap<>();
668         leadersSnapshot.put("1", "A");
669         leadersSnapshot.put("2", "B");
670         leadersSnapshot.put("3", "C");
671
672         // set the snapshot variables in replicatedlog
673
674         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
675         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
676         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
677
678         ByteString bs = toByteString(leadersSnapshot);
679         leader.setSnapshot(Optional.of(bs));
680         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
681         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
682         while(!fts.isLastChunk(fts.getChunkIndex())) {
683             fts.getNextChunk();
684             fts.incrementChunkIndex();
685         }
686
687         //clears leaders log
688         actorContext.getReplicatedLog().removeFrom(0);
689
690         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
691                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
692
693         assertTrue(raftBehavior instanceof Leader);
694
695         assertEquals(0, leader.followerSnapshotSize());
696         assertEquals(1, leader.followerLogSize());
697         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
698         assertNotNull(fli);
699         assertEquals(snapshotIndex, fli.getMatchIndex());
700         assertEquals(snapshotIndex, fli.getMatchIndex());
701         assertEquals(snapshotIndex + 1, fli.getNextIndex());
702     }
703
704     @Test
705     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
706         logStart("testSendSnapshotfromInstallSnapshotReply");
707
708         MockRaftActorContext actorContext = createActorContextWithFollower();
709
710         final int followersLastIndex = 2;
711         final int snapshotIndex = 3;
712         final int snapshotTerm = 1;
713         final int currentTerm = 2;
714
715         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
716             @Override
717             public int getSnapshotChunkSize() {
718                 return 50;
719             }
720         };
721         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
722         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
723
724         actorContext.setConfigParams(configParams);
725         actorContext.setCommitIndex(followersLastIndex);
726
727         leader = new Leader(actorContext);
728
729         Map<String, String> leadersSnapshot = new HashMap<>();
730         leadersSnapshot.put("1", "A");
731         leadersSnapshot.put("2", "B");
732         leadersSnapshot.put("3", "C");
733
734         // set the snapshot variables in replicatedlog
735         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
736         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
737         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
738
739         ByteString bs = toByteString(leadersSnapshot);
740         leader.setSnapshot(Optional.of(bs));
741
742         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
743
744         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
745
746         assertEquals(1, installSnapshot.getChunkIndex());
747         assertEquals(3, installSnapshot.getTotalChunks());
748
749         followerActor.underlyingActor().clear();
750         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
751                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
752
753         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
754
755         assertEquals(2, installSnapshot.getChunkIndex());
756         assertEquals(3, installSnapshot.getTotalChunks());
757
758         followerActor.underlyingActor().clear();
759         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
760                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
761
762         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
763
764         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
765         followerActor.underlyingActor().clear();
766         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
767                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
768
769         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
770
771         Assert.assertNull(installSnapshot);
772     }
773
774
775     @Test
776     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
777         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
778
779         MockRaftActorContext actorContext = createActorContextWithFollower();
780
781         final int followersLastIndex = 2;
782         final int snapshotIndex = 3;
783         final int snapshotTerm = 1;
784         final int currentTerm = 2;
785
786         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
787             @Override
788             public int getSnapshotChunkSize() {
789                 return 50;
790             }
791         });
792
793         actorContext.setCommitIndex(followersLastIndex);
794
795         leader = new Leader(actorContext);
796
797         Map<String, String> leadersSnapshot = new HashMap<>();
798         leadersSnapshot.put("1", "A");
799         leadersSnapshot.put("2", "B");
800         leadersSnapshot.put("3", "C");
801
802         // set the snapshot variables in replicatedlog
803         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
804         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
805         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
806
807         ByteString bs = toByteString(leadersSnapshot);
808         leader.setSnapshot(Optional.of(bs));
809
810         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
811         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
812
813         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
814
815         assertEquals(1, installSnapshot.getChunkIndex());
816         assertEquals(3, installSnapshot.getTotalChunks());
817
818         followerActor.underlyingActor().clear();
819
820         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
821                 FOLLOWER_ID, -1, false));
822
823         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
824                 TimeUnit.MILLISECONDS);
825
826         leader.handleMessage(leaderActor, new SendHeartBeat());
827
828         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
829
830         assertEquals(1, installSnapshot.getChunkIndex());
831         assertEquals(3, installSnapshot.getTotalChunks());
832     }
833
834     @Test
835     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
836         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
837
838         MockRaftActorContext actorContext = createActorContextWithFollower();
839
840         final int followersLastIndex = 2;
841         final int snapshotIndex = 3;
842         final int snapshotTerm = 1;
843         final int currentTerm = 2;
844
845         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
846             @Override
847             public int getSnapshotChunkSize() {
848                 return 50;
849             }
850         });
851
852         actorContext.setCommitIndex(followersLastIndex);
853
854         leader = new Leader(actorContext);
855
856         Map<String, String> leadersSnapshot = new HashMap<>();
857         leadersSnapshot.put("1", "A");
858         leadersSnapshot.put("2", "B");
859         leadersSnapshot.put("3", "C");
860
861         // set the snapshot variables in replicatedlog
862         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
863         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
864         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
865
866         ByteString bs = toByteString(leadersSnapshot);
867         leader.setSnapshot(Optional.of(bs));
868
869         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
870
871         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
872
873         assertEquals(1, installSnapshot.getChunkIndex());
874         assertEquals(3, installSnapshot.getTotalChunks());
875         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
876
877         int hashCode = installSnapshot.getData().hashCode();
878
879         followerActor.underlyingActor().clear();
880
881         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
882                 FOLLOWER_ID, 1, true));
883
884         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
885
886         assertEquals(2, installSnapshot.getChunkIndex());
887         assertEquals(3, installSnapshot.getTotalChunks());
888         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
889     }
890
891     @Test
892     public void testFollowerToSnapshotLogic() {
893         logStart("testFollowerToSnapshotLogic");
894
895         MockRaftActorContext actorContext = createActorContext();
896
897         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
898             @Override
899             public int getSnapshotChunkSize() {
900                 return 50;
901             }
902         });
903
904         leader = new Leader(actorContext);
905
906         Map<String, String> leadersSnapshot = new HashMap<>();
907         leadersSnapshot.put("1", "A");
908         leadersSnapshot.put("2", "B");
909         leadersSnapshot.put("3", "C");
910
911         ByteString bs = toByteString(leadersSnapshot);
912         byte[] barray = bs.toByteArray();
913
914         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
915         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
916
917         assertEquals(bs.size(), barray.length);
918
919         int chunkIndex=0;
920         for (int i=0; i < barray.length; i = i + 50) {
921             int j = i + 50;
922             chunkIndex++;
923
924             if (i + 50 > barray.length) {
925                 j = barray.length;
926             }
927
928             ByteString chunk = fts.getNextChunk();
929             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
930             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
931
932             fts.markSendStatus(true);
933             if (!fts.isLastChunk(chunkIndex)) {
934                 fts.incrementChunkIndex();
935             }
936         }
937
938         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
939     }
940
941     @Override protected RaftActorBehavior createBehavior(
942         RaftActorContext actorContext) {
943         return new Leader(actorContext);
944     }
945
946     @Override
947     protected MockRaftActorContext createActorContext() {
948         return createActorContext(leaderActor);
949     }
950
951     @Override
952     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
953         return createActorContext(LEADER_ID, actorRef);
954     }
955
956     private MockRaftActorContext createActorContextWithFollower() {
957         MockRaftActorContext actorContext = createActorContext();
958         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
959                 followerActor.path().toString()).build());
960         return actorContext;
961     }
962
963     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
964         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
965         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
966         configParams.setElectionTimeoutFactor(100000);
967         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
968         context.setConfigParams(configParams);
969         return context;
970     }
971
972     @Test
973     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
974         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
975
976         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
977
978         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
979
980         Follower follower = new Follower(followerActorContext);
981         followerActor.underlyingActor().setBehavior(follower);
982
983         Map<String, String> peerAddresses = new HashMap<>();
984         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
985
986         leaderActorContext.setPeerAddresses(peerAddresses);
987
988         leaderActorContext.getReplicatedLog().removeFrom(0);
989
990         //create 3 entries
991         leaderActorContext.setReplicatedLog(
992                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
993
994         leaderActorContext.setCommitIndex(1);
995
996         followerActorContext.getReplicatedLog().removeFrom(0);
997
998         // follower too has the exact same log entries and has the same commit index
999         followerActorContext.setReplicatedLog(
1000                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1001
1002         followerActorContext.setCommitIndex(1);
1003
1004         leader = new Leader(leaderActorContext);
1005
1006         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1007
1008         assertEquals(1, appendEntries.getLeaderCommit());
1009         assertEquals(0, appendEntries.getEntries().size());
1010         assertEquals(0, appendEntries.getPrevLogIndex());
1011
1012         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1013                 leaderActor, AppendEntriesReply.class);
1014
1015         assertEquals(2, appendEntriesReply.getLogLastIndex());
1016         assertEquals(1, appendEntriesReply.getLogLastTerm());
1017
1018         // follower returns its next index
1019         assertEquals(2, appendEntriesReply.getLogLastIndex());
1020         assertEquals(1, appendEntriesReply.getLogLastTerm());
1021
1022         follower.close();
1023     }
1024
1025     @Test
1026     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1027         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1028
1029         MockRaftActorContext leaderActorContext = createActorContext();
1030
1031         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1032         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1033
1034         Follower follower = new Follower(followerActorContext);
1035         followerActor.underlyingActor().setBehavior(follower);
1036
1037         Map<String, String> leaderPeerAddresses = new HashMap<>();
1038         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1039
1040         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1041
1042         leaderActorContext.getReplicatedLog().removeFrom(0);
1043
1044         leaderActorContext.setReplicatedLog(
1045                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1046
1047         leaderActorContext.setCommitIndex(1);
1048
1049         followerActorContext.getReplicatedLog().removeFrom(0);
1050
1051         followerActorContext.setReplicatedLog(
1052                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1053
1054         // follower has the same log entries but its commit index > leaders commit index
1055         followerActorContext.setCommitIndex(2);
1056
1057         leader = new Leader(leaderActorContext);
1058
1059         // Initial heartbeat
1060         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1061
1062         assertEquals(1, appendEntries.getLeaderCommit());
1063         assertEquals(0, appendEntries.getEntries().size());
1064         assertEquals(0, appendEntries.getPrevLogIndex());
1065
1066         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1067                 leaderActor, AppendEntriesReply.class);
1068
1069         assertEquals(2, appendEntriesReply.getLogLastIndex());
1070         assertEquals(1, appendEntriesReply.getLogLastTerm());
1071
1072         leaderActor.underlyingActor().setBehavior(follower);
1073         leader.handleMessage(followerActor, appendEntriesReply);
1074
1075         leaderActor.underlyingActor().clear();
1076         followerActor.underlyingActor().clear();
1077
1078         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1079                 TimeUnit.MILLISECONDS);
1080
1081         leader.handleMessage(leaderActor, new SendHeartBeat());
1082
1083         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1084
1085         assertEquals(2, appendEntries.getLeaderCommit());
1086         assertEquals(0, appendEntries.getEntries().size());
1087         assertEquals(2, appendEntries.getPrevLogIndex());
1088
1089         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1090
1091         assertEquals(2, appendEntriesReply.getLogLastIndex());
1092         assertEquals(1, appendEntriesReply.getLogLastTerm());
1093
1094         assertEquals(2, followerActorContext.getCommitIndex());
1095
1096         follower.close();
1097     }
1098
1099     @Test
1100     public void testHandleAppendEntriesReplyFailure(){
1101         logStart("testHandleAppendEntriesReplyFailure");
1102
1103         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1104
1105         leader = new Leader(leaderActorContext);
1106
1107         // Send initial heartbeat reply with last index.
1108         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
1109
1110         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1111         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1112
1113         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
1114
1115         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1116
1117         assertEquals(RaftState.Leader, raftActorBehavior.state());
1118
1119         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1120     }
1121
1122     @Test
1123     public void testHandleAppendEntriesReplySuccess() throws Exception {
1124         logStart("testHandleAppendEntriesReplySuccess");
1125
1126         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1127
1128         leaderActorContext.setReplicatedLog(
1129                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1130
1131         leaderActorContext.setCommitIndex(1);
1132         leaderActorContext.setLastApplied(1);
1133         leaderActorContext.getTermInformation().update(1, "leader");
1134
1135         leader = new Leader(leaderActorContext);
1136
1137         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
1138
1139         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1140
1141         assertEquals(RaftState.Leader, raftActorBehavior.state());
1142
1143         assertEquals(2, leaderActorContext.getCommitIndex());
1144
1145         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1146                 leaderActor, ApplyJournalEntries.class);
1147
1148         assertEquals(2, leaderActorContext.getLastApplied());
1149
1150         assertEquals(2, applyJournalEntries.getToIndex());
1151
1152         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1153                 ApplyState.class);
1154
1155         assertEquals(1,applyStateList.size());
1156
1157         ApplyState applyState = applyStateList.get(0);
1158
1159         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1160     }
1161
1162     @Test
1163     public void testHandleAppendEntriesReplyUnknownFollower(){
1164         logStart("testHandleAppendEntriesReplyUnknownFollower");
1165
1166         MockRaftActorContext leaderActorContext = createActorContext();
1167
1168         leader = new Leader(leaderActorContext);
1169
1170         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
1171
1172         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1173
1174         assertEquals(RaftState.Leader, raftActorBehavior.state());
1175     }
1176
1177     @Test
1178     public void testHandleRequestVoteReply(){
1179         logStart("testHandleRequestVoteReply");
1180
1181         MockRaftActorContext leaderActorContext = createActorContext();
1182
1183         leader = new Leader(leaderActorContext);
1184
1185         // Should be a no-op.
1186         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1187                 new RequestVoteReply(1, true));
1188
1189         assertEquals(RaftState.Leader, raftActorBehavior.state());
1190
1191         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1192
1193         assertEquals(RaftState.Leader, raftActorBehavior.state());
1194     }
1195
1196     @Test
1197     public void testIsolatedLeaderCheckNoFollowers() {
1198         logStart("testIsolatedLeaderCheckNoFollowers");
1199
1200         MockRaftActorContext leaderActorContext = createActorContext();
1201
1202         leader = new Leader(leaderActorContext);
1203         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1204         Assert.assertTrue(behavior instanceof Leader);
1205     }
1206
1207     @Test
1208     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1209         logStart("testIsolatedLeaderCheckTwoFollowers");
1210
1211         new JavaTestKit(getSystem()) {{
1212
1213             ActorRef followerActor1 = getTestActor();
1214             ActorRef followerActor2 = getTestActor();
1215
1216             MockRaftActorContext leaderActorContext = createActorContext();
1217
1218             Map<String, String> peerAddresses = new HashMap<>();
1219             peerAddresses.put("follower-1", followerActor1.path().toString());
1220             peerAddresses.put("follower-2", followerActor2.path().toString());
1221
1222             leaderActorContext.setPeerAddresses(peerAddresses);
1223
1224             leader = new Leader(leaderActorContext);
1225
1226             leader.markFollowerActive("follower-1");
1227             leader.markFollowerActive("follower-2");
1228             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1229             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1230                 behavior instanceof Leader);
1231
1232             // kill 1 follower and verify if that got killed
1233             final JavaTestKit probe = new JavaTestKit(getSystem());
1234             probe.watch(followerActor1);
1235             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1236             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1237             assertEquals(termMsg1.getActor(), followerActor1);
1238
1239             leader.markFollowerInActive("follower-1");
1240             leader.markFollowerActive("follower-2");
1241             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1242             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1243                 behavior instanceof Leader);
1244
1245             // kill 2nd follower and leader should change to Isolated leader
1246             followerActor2.tell(PoisonPill.getInstance(), null);
1247             probe.watch(followerActor2);
1248             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1249             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1250             assertEquals(termMsg2.getActor(), followerActor2);
1251
1252             leader.markFollowerInActive("follower-2");
1253             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1254             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1255                 behavior instanceof IsolatedLeader);
1256         }};
1257     }
1258
1259
1260     @Test
1261     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1262         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1263
1264         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1265
1266         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1267         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1268         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1269
1270         leaderActorContext.setConfigParams(configParams);
1271
1272         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1273
1274         followerActorContext.setConfigParams(configParams);
1275         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1276
1277         Follower follower = new Follower(followerActorContext);
1278         followerActor.underlyingActor().setBehavior(follower);
1279
1280         leaderActorContext.getReplicatedLog().removeFrom(0);
1281         leaderActorContext.setCommitIndex(-1);
1282         leaderActorContext.setLastApplied(-1);
1283
1284         followerActorContext.getReplicatedLog().removeFrom(0);
1285         followerActorContext.setCommitIndex(-1);
1286         followerActorContext.setLastApplied(-1);
1287
1288         leader = new Leader(leaderActorContext);
1289
1290         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1291                 leaderActor, AppendEntriesReply.class);
1292
1293         leader.handleMessage(followerActor, appendEntriesReply);
1294
1295         // Clear initial heartbeat messages
1296
1297         leaderActor.underlyingActor().clear();
1298         followerActor.underlyingActor().clear();
1299
1300         // create 3 entries
1301         leaderActorContext.setReplicatedLog(
1302                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1303         leaderActorContext.setCommitIndex(1);
1304         leaderActorContext.setLastApplied(1);
1305
1306         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1307                 TimeUnit.MILLISECONDS);
1308
1309         leader.handleMessage(leaderActor, new SendHeartBeat());
1310
1311         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1312
1313         // Should send first log entry
1314         assertEquals(1, appendEntries.getLeaderCommit());
1315         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1316         assertEquals(-1, appendEntries.getPrevLogIndex());
1317
1318         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1319
1320         assertEquals(1, appendEntriesReply.getLogLastTerm());
1321         assertEquals(0, appendEntriesReply.getLogLastIndex());
1322
1323         followerActor.underlyingActor().clear();
1324
1325         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1326
1327         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1328
1329         // Should send second log entry
1330         assertEquals(1, appendEntries.getLeaderCommit());
1331         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1332
1333         follower.close();
1334     }
1335
1336     @Test
1337     public void testLaggingFollowerStarvation() throws Exception {
1338         logStart("testLaggingFollowerStarvation");
1339         new JavaTestKit(getSystem()) {{
1340             String leaderActorId = actorFactory.generateActorId("leader");
1341             String follower1ActorId = actorFactory.generateActorId("follower");
1342             String follower2ActorId = actorFactory.generateActorId("follower");
1343
1344             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1345                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1346             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1347             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1348
1349             MockRaftActorContext leaderActorContext =
1350                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1351
1352             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1353             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1354             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1355
1356             leaderActorContext.setConfigParams(configParams);
1357
1358             leaderActorContext.setReplicatedLog(
1359                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1360
1361             Map<String, String> peerAddresses = new HashMap<>();
1362             peerAddresses.put(follower1ActorId,
1363                     follower1Actor.path().toString());
1364             peerAddresses.put(follower2ActorId,
1365                     follower2Actor.path().toString());
1366
1367             leaderActorContext.setPeerAddresses(peerAddresses);
1368             leaderActorContext.getTermInformation().update(1, leaderActorId);
1369
1370             RaftActorBehavior leader = createBehavior(leaderActorContext);
1371
1372             leaderActor.underlyingActor().setBehavior(leader);
1373
1374             for(int i=1;i<6;i++) {
1375                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1376                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1));
1377                 assertTrue(newBehavior == leader);
1378                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1379             }
1380
1381             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1382             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1383
1384             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1385                     heartbeats.size() > 1);
1386
1387             // Check if follower-2 got AppendEntries during this time and was not starved
1388             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1389
1390             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1391                     appendEntries.size() > 1);
1392
1393         }};
1394     }
1395
1396     @Override
1397     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1398             ActorRef actorRef, RaftRPC rpc) throws Exception {
1399         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1400         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1401     }
1402
1403     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1404
1405         private final long electionTimeOutIntervalMillis;
1406         private final int snapshotChunkSize;
1407
1408         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1409             super();
1410             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1411             this.snapshotChunkSize = snapshotChunkSize;
1412         }
1413
1414         @Override
1415         public FiniteDuration getElectionTimeOutInterval() {
1416             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1417         }
1418
1419         @Override
1420         public int getSnapshotChunkSize() {
1421             return snapshotChunkSize;
1422         }
1423     }
1424 }