Bug 3020: Add leader version to LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
30 import org.opendaylight.controller.cluster.raft.SerializationUtils;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
34 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
35 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
38 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
46 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class LeaderTest extends AbstractLeaderTest {
50
51     static final String FOLLOWER_ID = "follower";
52     public static final String LEADER_ID = "leader";
53
54     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
55             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
56
57     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
58             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
59
60     private Leader leader;
61     private final short payloadVersion = 5;
62
63     @Override
64     @After
65     public void tearDown() throws Exception {
66         if(leader != null) {
67             leader.close();
68         }
69
70         super.tearDown();
71     }
72
73     @Test
74     public void testHandleMessageForUnknownMessage() throws Exception {
75         logStart("testHandleMessageForUnknownMessage");
76
77         leader = new Leader(createActorContext());
78
79         // handle message should return the Leader state when it receives an
80         // unknown message
81         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
82         Assert.assertTrue(behavior instanceof Leader);
83     }
84
85     @Test
86     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
87         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
88
89         MockRaftActorContext actorContext = createActorContextWithFollower();
90         short payloadVersion = (short)5;
91         actorContext.setPayloadVersion(payloadVersion);
92
93         long term = 1;
94         actorContext.getTermInformation().update(term, "");
95
96         leader = new Leader(actorContext);
97
98         // Leader should send an immediate heartbeat with no entries as follower is inactive.
99         long lastIndex = actorContext.getReplicatedLog().lastIndex();
100         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
101         assertEquals("getTerm", term, appendEntries.getTerm());
102         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
103         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
104         assertEquals("Entries size", 0, appendEntries.getEntries().size());
105         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
106
107         // The follower would normally reply - simulate that explicitly here.
108         leader.handleMessage(followerActor, new AppendEntriesReply(
109                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
110         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
111
112         followerActor.underlyingActor().clear();
113
114         // Sleep for the heartbeat interval so AppendEntries is sent.
115         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
116                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
117
118         leader.handleMessage(leaderActor, new SendHeartBeat());
119
120         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
121         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
122         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
123         assertEquals("Entries size", 1, appendEntries.getEntries().size());
124         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
125         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
126         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
127     }
128
129
130     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
131         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
132         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
133                 1, index, payload);
134         actorContext.getReplicatedLog().append(newEntry);
135         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
136     }
137
138     @Test
139     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
140         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
141
142         MockRaftActorContext actorContext = createActorContextWithFollower();
143
144         long term = 1;
145         actorContext.getTermInformation().update(term, "");
146
147         leader = new Leader(actorContext);
148
149         // Leader will send an immediate heartbeat - ignore it.
150         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
151
152         // The follower would normally reply - simulate that explicitly here.
153         long lastIndex = actorContext.getReplicatedLog().lastIndex();
154         leader.handleMessage(followerActor, new AppendEntriesReply(
155                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
156         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
157
158         followerActor.underlyingActor().clear();
159
160         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
161
162         // State should not change
163         assertTrue(raftBehavior instanceof Leader);
164
165         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
166         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
167         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
168         assertEquals("Entries size", 1, appendEntries.getEntries().size());
169         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
170         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
171         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
172     }
173
174     @Test
175     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
176         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
177
178         MockRaftActorContext actorContext = createActorContextWithFollower();
179         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
180             @Override
181             public FiniteDuration getHeartBeatInterval() {
182                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
183             }
184         });
185
186         long term = 1;
187         actorContext.getTermInformation().update(term, "");
188
189         leader = new Leader(actorContext);
190
191         // Leader will send an immediate heartbeat - ignore it.
192         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
193
194         // The follower would normally reply - simulate that explicitly here.
195         long lastIndex = actorContext.getReplicatedLog().lastIndex();
196         leader.handleMessage(followerActor, new AppendEntriesReply(
197                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
198         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
199
200         followerActor.underlyingActor().clear();
201
202         for(int i=0;i<5;i++) {
203             sendReplicate(actorContext, lastIndex+i+1);
204         }
205
206         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
207         // We expect only 1 message to be sent because of two reasons,
208         // - an append entries reply was not received
209         // - the heartbeat interval has not expired
210         // In this scenario if multiple messages are sent they would likely be duplicates
211         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
212     }
213
214     @Test
215     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
216         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
217
218         MockRaftActorContext actorContext = createActorContextWithFollower();
219         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
220             @Override
221             public FiniteDuration getHeartBeatInterval() {
222                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
223             }
224         });
225
226         long term = 1;
227         actorContext.getTermInformation().update(term, "");
228
229         leader = new Leader(actorContext);
230
231         // Leader will send an immediate heartbeat - ignore it.
232         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233
234         // The follower would normally reply - simulate that explicitly here.
235         long lastIndex = actorContext.getReplicatedLog().lastIndex();
236         leader.handleMessage(followerActor, new AppendEntriesReply(
237                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
238         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
239
240         followerActor.underlyingActor().clear();
241
242         for(int i=0;i<3;i++) {
243             sendReplicate(actorContext, lastIndex+i+1);
244             leader.handleMessage(followerActor, new AppendEntriesReply(
245                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
246
247         }
248
249         for(int i=3;i<5;i++) {
250             sendReplicate(actorContext, lastIndex + i + 1);
251         }
252
253         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
254         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
255         // get sent to the follower - but not the 5th
256         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
257
258         for(int i=0;i<4;i++) {
259             long expected = allMessages.get(i).getEntries().get(0).getIndex();
260             assertEquals(expected, i+2);
261         }
262     }
263
264     @Test
265     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
266         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
267
268         MockRaftActorContext actorContext = createActorContextWithFollower();
269         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
270             @Override
271             public FiniteDuration getHeartBeatInterval() {
272                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
273             }
274         });
275
276         long term = 1;
277         actorContext.getTermInformation().update(term, "");
278
279         leader = new Leader(actorContext);
280
281         // Leader will send an immediate heartbeat - ignore it.
282         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
283
284         // The follower would normally reply - simulate that explicitly here.
285         long lastIndex = actorContext.getReplicatedLog().lastIndex();
286         leader.handleMessage(followerActor, new AppendEntriesReply(
287                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
288         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
289
290         followerActor.underlyingActor().clear();
291
292         sendReplicate(actorContext, lastIndex+1);
293
294         // Wait slightly longer than heartbeat duration
295         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
296
297         leader.handleMessage(leaderActor, new SendHeartBeat());
298
299         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
300         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
301
302         assertEquals(1, allMessages.get(0).getEntries().size());
303         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
304         assertEquals(1, allMessages.get(1).getEntries().size());
305         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
306
307     }
308
309     @Test
310     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
311         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
312
313         MockRaftActorContext actorContext = createActorContextWithFollower();
314         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
315             @Override
316             public FiniteDuration getHeartBeatInterval() {
317                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
318             }
319         });
320
321         long term = 1;
322         actorContext.getTermInformation().update(term, "");
323
324         leader = new Leader(actorContext);
325
326         // Leader will send an immediate heartbeat - ignore it.
327         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
328
329         // The follower would normally reply - simulate that explicitly here.
330         long lastIndex = actorContext.getReplicatedLog().lastIndex();
331         leader.handleMessage(followerActor, new AppendEntriesReply(
332                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
333         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
334
335         followerActor.underlyingActor().clear();
336
337         for(int i=0;i<3;i++) {
338             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
339             leader.handleMessage(leaderActor, new SendHeartBeat());
340         }
341
342         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
343         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
344     }
345
346     @Test
347     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
348         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
349
350         MockRaftActorContext actorContext = createActorContextWithFollower();
351         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352             @Override
353             public FiniteDuration getHeartBeatInterval() {
354                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
355             }
356         });
357
358         long term = 1;
359         actorContext.getTermInformation().update(term, "");
360
361         leader = new Leader(actorContext);
362
363         // Leader will send an immediate heartbeat - ignore it.
364         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365
366         // The follower would normally reply - simulate that explicitly here.
367         long lastIndex = actorContext.getReplicatedLog().lastIndex();
368         leader.handleMessage(followerActor, new AppendEntriesReply(
369                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371
372         followerActor.underlyingActor().clear();
373
374         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
375         leader.handleMessage(leaderActor, new SendHeartBeat());
376         sendReplicate(actorContext, lastIndex+1);
377
378         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
379         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
380
381         assertEquals(0, allMessages.get(0).getEntries().size());
382         assertEquals(1, allMessages.get(1).getEntries().size());
383     }
384
385
386     @Test
387     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
388         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
389
390         MockRaftActorContext actorContext = createActorContext();
391
392         leader = new Leader(actorContext);
393
394         actorContext.setLastApplied(0);
395
396         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
397         long term = actorContext.getTermInformation().getCurrentTerm();
398         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
399                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
400
401         actorContext.getReplicatedLog().append(newEntry);
402
403         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
404                 new Replicate(leaderActor, "state-id", newEntry));
405
406         // State should not change
407         assertTrue(raftBehavior instanceof Leader);
408
409         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
410
411         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
412         // one since lastApplied state is 0.
413         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
414                 leaderActor, ApplyState.class);
415         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
416
417         for(int i = 0; i <= newLogIndex - 1; i++ ) {
418             ApplyState applyState = applyStateList.get(i);
419             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
420             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
421         }
422
423         ApplyState last = applyStateList.get((int) newLogIndex - 1);
424         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
425         assertEquals("getIdentifier", "state-id", last.getIdentifier());
426     }
427
428     @Test
429     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
430         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
431
432         MockRaftActorContext actorContext = createActorContextWithFollower();
433
434         Map<String, String> leadersSnapshot = new HashMap<>();
435         leadersSnapshot.put("1", "A");
436         leadersSnapshot.put("2", "B");
437         leadersSnapshot.put("3", "C");
438
439         //clears leaders log
440         actorContext.getReplicatedLog().removeFrom(0);
441
442         final int followersLastIndex = 2;
443         final int snapshotIndex = 3;
444         final int newEntryIndex = 4;
445         final int snapshotTerm = 1;
446         final int currentTerm = 2;
447
448         // set the snapshot variables in replicatedlog
449         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
450         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
451         actorContext.setCommitIndex(followersLastIndex);
452         //set follower timeout to 2 mins, helps during debugging
453         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
454
455         leader = new Leader(actorContext);
456
457         // new entry
458         ReplicatedLogImplEntry entry =
459                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
460                         new MockRaftActorContext.MockPayload("D"));
461
462         //update follower timestamp
463         leader.markFollowerActive(FOLLOWER_ID);
464
465         ByteString bs = toByteString(leadersSnapshot);
466         leader.setSnapshot(Optional.of(bs));
467         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
468         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
469
470         //send first chunk and no InstallSnapshotReply received yet
471         fts.getNextChunk();
472         fts.incrementChunkIndex();
473
474         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
475                 TimeUnit.MILLISECONDS);
476
477         leader.handleMessage(leaderActor, new SendHeartBeat());
478
479         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
480
481         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
482
483         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
484
485         //InstallSnapshotReply received
486         fts.markSendStatus(true);
487
488         leader.handleMessage(leaderActor, new SendHeartBeat());
489
490         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
491
492         assertEquals(snapshotIndex, is.getLastIncludedIndex());
493     }
494
495     @Test
496     public void testSendAppendEntriesSnapshotScenario() throws Exception {
497         logStart("testSendAppendEntriesSnapshotScenario");
498
499         MockRaftActorContext actorContext = createActorContextWithFollower();
500
501         Map<String, String> leadersSnapshot = new HashMap<>();
502         leadersSnapshot.put("1", "A");
503         leadersSnapshot.put("2", "B");
504         leadersSnapshot.put("3", "C");
505
506         //clears leaders log
507         actorContext.getReplicatedLog().removeFrom(0);
508
509         final int followersLastIndex = 2;
510         final int snapshotIndex = 3;
511         final int newEntryIndex = 4;
512         final int snapshotTerm = 1;
513         final int currentTerm = 2;
514
515         // set the snapshot variables in replicatedlog
516         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
517         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
518         actorContext.setCommitIndex(followersLastIndex);
519
520         leader = new Leader(actorContext);
521
522         // Leader will send an immediate heartbeat - ignore it.
523         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
524
525         // new entry
526         ReplicatedLogImplEntry entry =
527                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
528                         new MockRaftActorContext.MockPayload("D"));
529
530         actorContext.getReplicatedLog().append(entry);
531
532         //update follower timestamp
533         leader.markFollowerActive(FOLLOWER_ID);
534
535         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
536         RaftActorBehavior raftBehavior = leader.handleMessage(
537                 leaderActor, new Replicate(null, "state-id", entry));
538
539         assertTrue(raftBehavior instanceof Leader);
540
541         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
542     }
543
544     @Test
545     public void testInitiateInstallSnapshot() throws Exception {
546         logStart("testInitiateInstallSnapshot");
547
548         MockRaftActorContext actorContext = createActorContextWithFollower();
549
550         Map<String, String> leadersSnapshot = new HashMap<>();
551         leadersSnapshot.put("1", "A");
552         leadersSnapshot.put("2", "B");
553         leadersSnapshot.put("3", "C");
554
555         //clears leaders log
556         actorContext.getReplicatedLog().removeFrom(0);
557
558         final int followersLastIndex = 2;
559         final int snapshotIndex = 3;
560         final int newEntryIndex = 4;
561         final int snapshotTerm = 1;
562         final int currentTerm = 2;
563
564         // set the snapshot variables in replicatedlog
565         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
566         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
567         actorContext.setLastApplied(3);
568         actorContext.setCommitIndex(followersLastIndex);
569
570         leader = new Leader(actorContext);
571
572         // Leader will send an immediate heartbeat - ignore it.
573         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
574
575         // set the snapshot as absent and check if capture-snapshot is invoked.
576         leader.setSnapshot(Optional.<ByteString>absent());
577
578         // new entry
579         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
580                 new MockRaftActorContext.MockPayload("D"));
581
582         actorContext.getReplicatedLog().append(entry);
583
584         //update follower timestamp
585         leader.markFollowerActive(FOLLOWER_ID);
586
587         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
588
589         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
590
591         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
592
593         assertTrue(cs.isInstallSnapshotInitiated());
594         assertEquals(3, cs.getLastAppliedIndex());
595         assertEquals(1, cs.getLastAppliedTerm());
596         assertEquals(4, cs.getLastIndex());
597         assertEquals(2, cs.getLastTerm());
598
599         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
600         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
601
602         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
603     }
604
605     @Test
606     public void testInstallSnapshot() throws Exception {
607         logStart("testInstallSnapshot");
608
609         MockRaftActorContext actorContext = createActorContextWithFollower();
610
611         Map<String, String> leadersSnapshot = new HashMap<>();
612         leadersSnapshot.put("1", "A");
613         leadersSnapshot.put("2", "B");
614         leadersSnapshot.put("3", "C");
615
616         //clears leaders log
617         actorContext.getReplicatedLog().removeFrom(0);
618
619         final int followersLastIndex = 2;
620         final int snapshotIndex = 3;
621         final int snapshotTerm = 1;
622         final int currentTerm = 2;
623
624         // set the snapshot variables in replicatedlog
625         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
626         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
627         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
628         actorContext.setCommitIndex(followersLastIndex);
629
630         leader = new Leader(actorContext);
631
632         // Ignore initial heartbeat.
633         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
634
635         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
636                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
637
638         assertTrue(raftBehavior instanceof Leader);
639
640         // check if installsnapshot gets called with the correct values.
641
642         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
643
644         assertNotNull(installSnapshot.getData());
645         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
646         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
647
648         assertEquals(currentTerm, installSnapshot.getTerm());
649     }
650
651     @Test
652     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
653         logStart("testHandleInstallSnapshotReplyLastChunk");
654
655         MockRaftActorContext actorContext = createActorContextWithFollower();
656
657         final int followersLastIndex = 2;
658         final int snapshotIndex = 3;
659         final int snapshotTerm = 1;
660         final int currentTerm = 2;
661
662         actorContext.setCommitIndex(followersLastIndex);
663
664         leader = new Leader(actorContext);
665
666         // Ignore initial heartbeat.
667         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
668
669         Map<String, String> leadersSnapshot = new HashMap<>();
670         leadersSnapshot.put("1", "A");
671         leadersSnapshot.put("2", "B");
672         leadersSnapshot.put("3", "C");
673
674         // set the snapshot variables in replicatedlog
675
676         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
677         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
678         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
679
680         ByteString bs = toByteString(leadersSnapshot);
681         leader.setSnapshot(Optional.of(bs));
682         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
683         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
684         while(!fts.isLastChunk(fts.getChunkIndex())) {
685             fts.getNextChunk();
686             fts.incrementChunkIndex();
687         }
688
689         //clears leaders log
690         actorContext.getReplicatedLog().removeFrom(0);
691
692         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
693                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
694
695         assertTrue(raftBehavior instanceof Leader);
696
697         assertEquals(0, leader.followerSnapshotSize());
698         assertEquals(1, leader.followerLogSize());
699         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
700         assertNotNull(fli);
701         assertEquals(snapshotIndex, fli.getMatchIndex());
702         assertEquals(snapshotIndex, fli.getMatchIndex());
703         assertEquals(snapshotIndex + 1, fli.getNextIndex());
704     }
705
706     @Test
707     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
708         logStart("testSendSnapshotfromInstallSnapshotReply");
709
710         MockRaftActorContext actorContext = createActorContextWithFollower();
711
712         final int followersLastIndex = 2;
713         final int snapshotIndex = 3;
714         final int snapshotTerm = 1;
715         final int currentTerm = 2;
716
717         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
718             @Override
719             public int getSnapshotChunkSize() {
720                 return 50;
721             }
722         };
723         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
724         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
725
726         actorContext.setConfigParams(configParams);
727         actorContext.setCommitIndex(followersLastIndex);
728
729         leader = new Leader(actorContext);
730
731         Map<String, String> leadersSnapshot = new HashMap<>();
732         leadersSnapshot.put("1", "A");
733         leadersSnapshot.put("2", "B");
734         leadersSnapshot.put("3", "C");
735
736         // set the snapshot variables in replicatedlog
737         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
738         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
739         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
740
741         ByteString bs = toByteString(leadersSnapshot);
742         leader.setSnapshot(Optional.of(bs));
743
744         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
745
746         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
747
748         assertEquals(1, installSnapshot.getChunkIndex());
749         assertEquals(3, installSnapshot.getTotalChunks());
750
751         followerActor.underlyingActor().clear();
752         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
753                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
754
755         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
756
757         assertEquals(2, installSnapshot.getChunkIndex());
758         assertEquals(3, installSnapshot.getTotalChunks());
759
760         followerActor.underlyingActor().clear();
761         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
762                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
763
764         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
765
766         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
767         followerActor.underlyingActor().clear();
768         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
769                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
770
771         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
772
773         Assert.assertNull(installSnapshot);
774     }
775
776
777     @Test
778     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
779         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
780
781         MockRaftActorContext actorContext = createActorContextWithFollower();
782
783         final int followersLastIndex = 2;
784         final int snapshotIndex = 3;
785         final int snapshotTerm = 1;
786         final int currentTerm = 2;
787
788         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
789             @Override
790             public int getSnapshotChunkSize() {
791                 return 50;
792             }
793         });
794
795         actorContext.setCommitIndex(followersLastIndex);
796
797         leader = new Leader(actorContext);
798
799         Map<String, String> leadersSnapshot = new HashMap<>();
800         leadersSnapshot.put("1", "A");
801         leadersSnapshot.put("2", "B");
802         leadersSnapshot.put("3", "C");
803
804         // set the snapshot variables in replicatedlog
805         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
806         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
807         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
808
809         ByteString bs = toByteString(leadersSnapshot);
810         leader.setSnapshot(Optional.of(bs));
811
812         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
813         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
814
815         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
816
817         assertEquals(1, installSnapshot.getChunkIndex());
818         assertEquals(3, installSnapshot.getTotalChunks());
819
820         followerActor.underlyingActor().clear();
821
822         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
823                 FOLLOWER_ID, -1, false));
824
825         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
826                 TimeUnit.MILLISECONDS);
827
828         leader.handleMessage(leaderActor, new SendHeartBeat());
829
830         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
831
832         assertEquals(1, installSnapshot.getChunkIndex());
833         assertEquals(3, installSnapshot.getTotalChunks());
834     }
835
836     @Test
837     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
838         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
839
840         MockRaftActorContext actorContext = createActorContextWithFollower();
841
842         final int followersLastIndex = 2;
843         final int snapshotIndex = 3;
844         final int snapshotTerm = 1;
845         final int currentTerm = 2;
846
847         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
848             @Override
849             public int getSnapshotChunkSize() {
850                 return 50;
851             }
852         });
853
854         actorContext.setCommitIndex(followersLastIndex);
855
856         leader = new Leader(actorContext);
857
858         Map<String, String> leadersSnapshot = new HashMap<>();
859         leadersSnapshot.put("1", "A");
860         leadersSnapshot.put("2", "B");
861         leadersSnapshot.put("3", "C");
862
863         // set the snapshot variables in replicatedlog
864         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
865         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
866         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
867
868         ByteString bs = toByteString(leadersSnapshot);
869         leader.setSnapshot(Optional.of(bs));
870
871         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
872
873         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
874
875         assertEquals(1, installSnapshot.getChunkIndex());
876         assertEquals(3, installSnapshot.getTotalChunks());
877         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
878
879         int hashCode = installSnapshot.getData().hashCode();
880
881         followerActor.underlyingActor().clear();
882
883         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
884                 FOLLOWER_ID, 1, true));
885
886         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
887
888         assertEquals(2, installSnapshot.getChunkIndex());
889         assertEquals(3, installSnapshot.getTotalChunks());
890         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
891     }
892
893     @Test
894     public void testFollowerToSnapshotLogic() {
895         logStart("testFollowerToSnapshotLogic");
896
897         MockRaftActorContext actorContext = createActorContext();
898
899         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
900             @Override
901             public int getSnapshotChunkSize() {
902                 return 50;
903             }
904         });
905
906         leader = new Leader(actorContext);
907
908         Map<String, String> leadersSnapshot = new HashMap<>();
909         leadersSnapshot.put("1", "A");
910         leadersSnapshot.put("2", "B");
911         leadersSnapshot.put("3", "C");
912
913         ByteString bs = toByteString(leadersSnapshot);
914         byte[] barray = bs.toByteArray();
915
916         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
917         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
918
919         assertEquals(bs.size(), barray.length);
920
921         int chunkIndex=0;
922         for (int i=0; i < barray.length; i = i + 50) {
923             int j = i + 50;
924             chunkIndex++;
925
926             if (i + 50 > barray.length) {
927                 j = barray.length;
928             }
929
930             ByteString chunk = fts.getNextChunk();
931             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
932             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
933
934             fts.markSendStatus(true);
935             if (!fts.isLastChunk(chunkIndex)) {
936                 fts.incrementChunkIndex();
937             }
938         }
939
940         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
941     }
942
943     @Override protected RaftActorBehavior createBehavior(
944         RaftActorContext actorContext) {
945         return new Leader(actorContext);
946     }
947
948     @Override
949     protected MockRaftActorContext createActorContext() {
950         return createActorContext(leaderActor);
951     }
952
953     @Override
954     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
955         return createActorContext(LEADER_ID, actorRef);
956     }
957
958     private MockRaftActorContext createActorContextWithFollower() {
959         MockRaftActorContext actorContext = createActorContext();
960         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
961                 followerActor.path().toString()).build());
962         return actorContext;
963     }
964
965     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
966         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
967         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
968         configParams.setElectionTimeoutFactor(100000);
969         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
970         context.setConfigParams(configParams);
971         context.setPayloadVersion(payloadVersion);
972         return context;
973     }
974
975     private MockRaftActorContext createFollowerActorContextWithLeader() {
976         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
977         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
978         followerConfig.setElectionTimeoutFactor(10000);
979         followerActorContext.setConfigParams(followerConfig);
980         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
981         return followerActorContext;
982     }
983
984     @Test
985     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
986         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
987
988         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
989
990         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
991
992         Follower follower = new Follower(followerActorContext);
993         followerActor.underlyingActor().setBehavior(follower);
994
995         Map<String, String> peerAddresses = new HashMap<>();
996         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
997
998         leaderActorContext.setPeerAddresses(peerAddresses);
999
1000         leaderActorContext.getReplicatedLog().removeFrom(0);
1001
1002         //create 3 entries
1003         leaderActorContext.setReplicatedLog(
1004                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1005
1006         leaderActorContext.setCommitIndex(1);
1007
1008         followerActorContext.getReplicatedLog().removeFrom(0);
1009
1010         // follower too has the exact same log entries and has the same commit index
1011         followerActorContext.setReplicatedLog(
1012                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1013
1014         followerActorContext.setCommitIndex(1);
1015
1016         leader = new Leader(leaderActorContext);
1017
1018         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1019
1020         assertEquals(1, appendEntries.getLeaderCommit());
1021         assertEquals(0, appendEntries.getEntries().size());
1022         assertEquals(0, appendEntries.getPrevLogIndex());
1023
1024         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1025                 leaderActor, AppendEntriesReply.class);
1026
1027         assertEquals(2, appendEntriesReply.getLogLastIndex());
1028         assertEquals(1, appendEntriesReply.getLogLastTerm());
1029
1030         // follower returns its next index
1031         assertEquals(2, appendEntriesReply.getLogLastIndex());
1032         assertEquals(1, appendEntriesReply.getLogLastTerm());
1033
1034         follower.close();
1035     }
1036
1037     @Test
1038     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1039         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1040
1041         MockRaftActorContext leaderActorContext = createActorContext();
1042
1043         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1044         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1045
1046         Follower follower = new Follower(followerActorContext);
1047         followerActor.underlyingActor().setBehavior(follower);
1048
1049         Map<String, String> leaderPeerAddresses = new HashMap<>();
1050         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1051
1052         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1053
1054         leaderActorContext.getReplicatedLog().removeFrom(0);
1055
1056         leaderActorContext.setReplicatedLog(
1057                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1058
1059         leaderActorContext.setCommitIndex(1);
1060
1061         followerActorContext.getReplicatedLog().removeFrom(0);
1062
1063         followerActorContext.setReplicatedLog(
1064                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1065
1066         // follower has the same log entries but its commit index > leaders commit index
1067         followerActorContext.setCommitIndex(2);
1068
1069         leader = new Leader(leaderActorContext);
1070
1071         // Initial heartbeat
1072         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1073
1074         assertEquals(1, appendEntries.getLeaderCommit());
1075         assertEquals(0, appendEntries.getEntries().size());
1076         assertEquals(0, appendEntries.getPrevLogIndex());
1077
1078         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1079                 leaderActor, AppendEntriesReply.class);
1080
1081         assertEquals(2, appendEntriesReply.getLogLastIndex());
1082         assertEquals(1, appendEntriesReply.getLogLastTerm());
1083
1084         leaderActor.underlyingActor().setBehavior(follower);
1085         leader.handleMessage(followerActor, appendEntriesReply);
1086
1087         leaderActor.underlyingActor().clear();
1088         followerActor.underlyingActor().clear();
1089
1090         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1091                 TimeUnit.MILLISECONDS);
1092
1093         leader.handleMessage(leaderActor, new SendHeartBeat());
1094
1095         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1096
1097         assertEquals(2, appendEntries.getLeaderCommit());
1098         assertEquals(0, appendEntries.getEntries().size());
1099         assertEquals(2, appendEntries.getPrevLogIndex());
1100
1101         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1102
1103         assertEquals(2, appendEntriesReply.getLogLastIndex());
1104         assertEquals(1, appendEntriesReply.getLogLastTerm());
1105
1106         assertEquals(2, followerActorContext.getCommitIndex());
1107
1108         follower.close();
1109     }
1110
1111     @Test
1112     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
1113         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1114
1115         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1116         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1117                 new FiniteDuration(1000, TimeUnit.SECONDS));
1118
1119         leaderActorContext.setReplicatedLog(
1120                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1121         long leaderCommitIndex = 2;
1122         leaderActorContext.setCommitIndex(leaderCommitIndex);
1123         leaderActorContext.setLastApplied(leaderCommitIndex);
1124
1125         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1126         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1127
1128         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1129
1130         followerActorContext.setReplicatedLog(
1131                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1132         followerActorContext.setCommitIndex(0);
1133         followerActorContext.setLastApplied(0);
1134
1135         Follower follower = new Follower(followerActorContext);
1136         followerActor.underlyingActor().setBehavior(follower);
1137
1138         leader = new Leader(leaderActorContext);
1139
1140         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1141         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1142
1143         MessageCollectorActor.clearMessages(followerActor);
1144         MessageCollectorActor.clearMessages(leaderActor);
1145
1146         // Verify initial AppendEntries sent with the leader's current commit index.
1147         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1148         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1149         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1150
1151         leaderActor.underlyingActor().setBehavior(leader);
1152
1153         leader.handleMessage(followerActor, appendEntriesReply);
1154
1155         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1156         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1157
1158         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1159         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1160         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1161
1162         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1163         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1164                 appendEntries.getEntries().get(0).getData());
1165         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1166         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1167                 appendEntries.getEntries().get(1).getData());
1168
1169         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1170         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1171
1172         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1173
1174         ApplyState applyState = applyStateList.get(0);
1175         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1176         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1177         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1178                 applyState.getReplicatedLogEntry().getData());
1179
1180         applyState = applyStateList.get(1);
1181         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1182         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1183         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1184                 applyState.getReplicatedLogEntry().getData());
1185
1186         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1187         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1188     }
1189
1190     @Test
1191     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1192         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1193
1194         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1195         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1196                 new FiniteDuration(1000, TimeUnit.SECONDS));
1197
1198         leaderActorContext.setReplicatedLog(
1199                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1200         long leaderCommitIndex = 1;
1201         leaderActorContext.setCommitIndex(leaderCommitIndex);
1202         leaderActorContext.setLastApplied(leaderCommitIndex);
1203
1204         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1205         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1206
1207         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1208
1209         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1210         followerActorContext.setCommitIndex(-1);
1211         followerActorContext.setLastApplied(-1);
1212
1213         Follower follower = new Follower(followerActorContext);
1214         followerActor.underlyingActor().setBehavior(follower);
1215
1216         leader = new Leader(leaderActorContext);
1217
1218         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1219         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1220
1221         MessageCollectorActor.clearMessages(followerActor);
1222         MessageCollectorActor.clearMessages(leaderActor);
1223
1224         // Verify initial AppendEntries sent with the leader's current commit index.
1225         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1226         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1227         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1228
1229         leaderActor.underlyingActor().setBehavior(leader);
1230
1231         leader.handleMessage(followerActor, appendEntriesReply);
1232
1233         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1234         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1235
1236         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1237         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1238         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1239
1240         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1241         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1242                 appendEntries.getEntries().get(0).getData());
1243         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1244         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1245                 appendEntries.getEntries().get(1).getData());
1246
1247         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1248         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1249
1250         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1251
1252         ApplyState applyState = applyStateList.get(0);
1253         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1254         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1255         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1256                 applyState.getReplicatedLogEntry().getData());
1257
1258         applyState = applyStateList.get(1);
1259         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1260         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1261         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1262                 applyState.getReplicatedLogEntry().getData());
1263
1264         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1265         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1266     }
1267
1268     @Test
1269     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
1270         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1271
1272         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1273         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1274                 new FiniteDuration(1000, TimeUnit.SECONDS));
1275
1276         leaderActorContext.setReplicatedLog(
1277                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1278         long leaderCommitIndex = 1;
1279         leaderActorContext.setCommitIndex(leaderCommitIndex);
1280         leaderActorContext.setLastApplied(leaderCommitIndex);
1281
1282         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1283         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1284
1285         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1286
1287         followerActorContext.setReplicatedLog(
1288                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1289         followerActorContext.setCommitIndex(-1);
1290         followerActorContext.setLastApplied(-1);
1291
1292         Follower follower = new Follower(followerActorContext);
1293         followerActor.underlyingActor().setBehavior(follower);
1294
1295         leader = new Leader(leaderActorContext);
1296
1297         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1298         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1299
1300         MessageCollectorActor.clearMessages(followerActor);
1301         MessageCollectorActor.clearMessages(leaderActor);
1302
1303         // Verify initial AppendEntries sent with the leader's current commit index.
1304         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1305         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1306         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1307
1308         leaderActor.underlyingActor().setBehavior(leader);
1309
1310         leader.handleMessage(followerActor, appendEntriesReply);
1311
1312         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1313         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1314
1315         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1316         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1317         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1318
1319         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1320         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1321         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1322                 appendEntries.getEntries().get(0).getData());
1323         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1324         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1325         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1326                 appendEntries.getEntries().get(1).getData());
1327
1328         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1329         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1330
1331         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1332
1333         ApplyState applyState = applyStateList.get(0);
1334         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1335         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1336         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1337                 applyState.getReplicatedLogEntry().getData());
1338
1339         applyState = applyStateList.get(1);
1340         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1341         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1342         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1343                 applyState.getReplicatedLogEntry().getData());
1344
1345         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1346         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1347         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1348     }
1349
1350     @Test
1351     public void testHandleAppendEntriesReplySuccess() throws Exception {
1352         logStart("testHandleAppendEntriesReplySuccess");
1353
1354         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1355
1356         leaderActorContext.setReplicatedLog(
1357                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1358
1359         leaderActorContext.setCommitIndex(1);
1360         leaderActorContext.setLastApplied(1);
1361         leaderActorContext.getTermInformation().update(1, "leader");
1362
1363         leader = new Leader(leaderActorContext);
1364
1365         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1366
1367         short payloadVersion = 5;
1368         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1369
1370         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1371
1372         assertEquals(RaftState.Leader, raftActorBehavior.state());
1373
1374         assertEquals(2, leaderActorContext.getCommitIndex());
1375
1376         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1377                 leaderActor, ApplyJournalEntries.class);
1378
1379         assertEquals(2, leaderActorContext.getLastApplied());
1380
1381         assertEquals(2, applyJournalEntries.getToIndex());
1382
1383         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1384                 ApplyState.class);
1385
1386         assertEquals(1,applyStateList.size());
1387
1388         ApplyState applyState = applyStateList.get(0);
1389
1390         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1391
1392         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1393         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1394     }
1395
1396     @Test
1397     public void testHandleAppendEntriesReplyUnknownFollower(){
1398         logStart("testHandleAppendEntriesReplyUnknownFollower");
1399
1400         MockRaftActorContext leaderActorContext = createActorContext();
1401
1402         leader = new Leader(leaderActorContext);
1403
1404         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1405
1406         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1407
1408         assertEquals(RaftState.Leader, raftActorBehavior.state());
1409     }
1410
1411     @Test
1412     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1413         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1414
1415         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1416         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1417                 new FiniteDuration(1000, TimeUnit.SECONDS));
1418         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
1419
1420         leaderActorContext.setReplicatedLog(
1421                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1422         long leaderCommitIndex = 3;
1423         leaderActorContext.setCommitIndex(leaderCommitIndex);
1424         leaderActorContext.setLastApplied(leaderCommitIndex);
1425
1426         ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1427         ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1428         ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1429         ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1430
1431         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1432
1433         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1434         followerActorContext.setCommitIndex(-1);
1435         followerActorContext.setLastApplied(-1);
1436
1437         Follower follower = new Follower(followerActorContext);
1438         followerActor.underlyingActor().setBehavior(follower);
1439
1440         leader = new Leader(leaderActorContext);
1441
1442         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1443         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1444
1445         MessageCollectorActor.clearMessages(followerActor);
1446         MessageCollectorActor.clearMessages(leaderActor);
1447
1448         // Verify initial AppendEntries sent with the leader's current commit index.
1449         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1450         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1451         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1452
1453         leaderActor.underlyingActor().setBehavior(leader);
1454
1455         leader.handleMessage(followerActor, appendEntriesReply);
1456
1457         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
1458         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1459
1460         appendEntries = appendEntriesList.get(0);
1461         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1462         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1463         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1464
1465         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1466         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1467                 appendEntries.getEntries().get(0).getData());
1468         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1469         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1470                 appendEntries.getEntries().get(1).getData());
1471
1472         appendEntries = appendEntriesList.get(1);
1473         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1474         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1475         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1476
1477         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1478         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1479                 appendEntries.getEntries().get(0).getData());
1480         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1481         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1482                 appendEntries.getEntries().get(1).getData());
1483
1484         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1485         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1486
1487         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1488
1489         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1490         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1491     }
1492
1493     @Test
1494     public void testHandleRequestVoteReply(){
1495         logStart("testHandleRequestVoteReply");
1496
1497         MockRaftActorContext leaderActorContext = createActorContext();
1498
1499         leader = new Leader(leaderActorContext);
1500
1501         // Should be a no-op.
1502         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1503                 new RequestVoteReply(1, true));
1504
1505         assertEquals(RaftState.Leader, raftActorBehavior.state());
1506
1507         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1508
1509         assertEquals(RaftState.Leader, raftActorBehavior.state());
1510     }
1511
1512     @Test
1513     public void testIsolatedLeaderCheckNoFollowers() {
1514         logStart("testIsolatedLeaderCheckNoFollowers");
1515
1516         MockRaftActorContext leaderActorContext = createActorContext();
1517
1518         leader = new Leader(leaderActorContext);
1519         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1520         Assert.assertTrue(behavior instanceof Leader);
1521     }
1522
1523     @Test
1524     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1525         logStart("testIsolatedLeaderCheckTwoFollowers");
1526
1527         new JavaTestKit(getSystem()) {{
1528
1529             ActorRef followerActor1 = getTestActor();
1530             ActorRef followerActor2 = getTestActor();
1531
1532             MockRaftActorContext leaderActorContext = createActorContext();
1533
1534             Map<String, String> peerAddresses = new HashMap<>();
1535             peerAddresses.put("follower-1", followerActor1.path().toString());
1536             peerAddresses.put("follower-2", followerActor2.path().toString());
1537
1538             leaderActorContext.setPeerAddresses(peerAddresses);
1539
1540             leader = new Leader(leaderActorContext);
1541
1542             leader.markFollowerActive("follower-1");
1543             leader.markFollowerActive("follower-2");
1544             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1545             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1546                 behavior instanceof Leader);
1547
1548             // kill 1 follower and verify if that got killed
1549             final JavaTestKit probe = new JavaTestKit(getSystem());
1550             probe.watch(followerActor1);
1551             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1552             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1553             assertEquals(termMsg1.getActor(), followerActor1);
1554
1555             leader.markFollowerInActive("follower-1");
1556             leader.markFollowerActive("follower-2");
1557             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1558             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1559                 behavior instanceof Leader);
1560
1561             // kill 2nd follower and leader should change to Isolated leader
1562             followerActor2.tell(PoisonPill.getInstance(), null);
1563             probe.watch(followerActor2);
1564             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1565             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1566             assertEquals(termMsg2.getActor(), followerActor2);
1567
1568             leader.markFollowerInActive("follower-2");
1569             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1570             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1571                 behavior instanceof IsolatedLeader);
1572         }};
1573     }
1574
1575     @Test
1576     public void testLaggingFollowerStarvation() throws Exception {
1577         logStart("testLaggingFollowerStarvation");
1578         new JavaTestKit(getSystem()) {{
1579             String leaderActorId = actorFactory.generateActorId("leader");
1580             String follower1ActorId = actorFactory.generateActorId("follower");
1581             String follower2ActorId = actorFactory.generateActorId("follower");
1582
1583             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1584                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1585             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1586             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1587
1588             MockRaftActorContext leaderActorContext =
1589                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1590
1591             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1592             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1593             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1594
1595             leaderActorContext.setConfigParams(configParams);
1596
1597             leaderActorContext.setReplicatedLog(
1598                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1599
1600             Map<String, String> peerAddresses = new HashMap<>();
1601             peerAddresses.put(follower1ActorId,
1602                     follower1Actor.path().toString());
1603             peerAddresses.put(follower2ActorId,
1604                     follower2Actor.path().toString());
1605
1606             leaderActorContext.setPeerAddresses(peerAddresses);
1607             leaderActorContext.getTermInformation().update(1, leaderActorId);
1608
1609             RaftActorBehavior leader = createBehavior(leaderActorContext);
1610
1611             leaderActor.underlyingActor().setBehavior(leader);
1612
1613             for(int i=1;i<6;i++) {
1614                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1615                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1616                 assertTrue(newBehavior == leader);
1617                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1618             }
1619
1620             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1621             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1622
1623             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1624                     heartbeats.size() > 1);
1625
1626             // Check if follower-2 got AppendEntries during this time and was not starved
1627             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1628
1629             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1630                     appendEntries.size() > 1);
1631
1632         }};
1633     }
1634
1635     @Override
1636     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1637             ActorRef actorRef, RaftRPC rpc) throws Exception {
1638         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1639         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1640     }
1641
1642     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1643
1644         private final long electionTimeOutIntervalMillis;
1645         private final int snapshotChunkSize;
1646
1647         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1648             super();
1649             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1650             this.snapshotChunkSize = snapshotChunkSize;
1651         }
1652
1653         @Override
1654         public FiniteDuration getElectionTimeOutInterval() {
1655             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1656         }
1657
1658         @Override
1659         public int getSnapshotChunkSize() {
1660             return snapshotChunkSize;
1661         }
1662     }
1663 }