Bug 3020: Add version to AppendEntries and AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
45 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractLeaderTest {
49
50     static final String FOLLOWER_ID = "follower";
51     public static final String LEADER_ID = "leader";
52
53     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
54             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
55
56     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
57             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
58
59     private Leader leader;
60
61     @Override
62     @After
63     public void tearDown() throws Exception {
64         if(leader != null) {
65             leader.close();
66         }
67
68         super.tearDown();
69     }
70
71     @Test
72     public void testHandleMessageForUnknownMessage() throws Exception {
73         logStart("testHandleMessageForUnknownMessage");
74
75         leader = new Leader(createActorContext());
76
77         // handle message should return the Leader state when it receives an
78         // unknown message
79         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
80         Assert.assertTrue(behavior instanceof Leader);
81     }
82
83     @Test
84     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
85         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
86
87         MockRaftActorContext actorContext = createActorContextWithFollower();
88         short payloadVersion = (short)5;
89         actorContext.setPayloadVersion(payloadVersion);
90
91         long term = 1;
92         actorContext.getTermInformation().update(term, "");
93
94         leader = new Leader(actorContext);
95
96         // Leader should send an immediate heartbeat with no entries as follower is inactive.
97         long lastIndex = actorContext.getReplicatedLog().lastIndex();
98         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
99         assertEquals("getTerm", term, appendEntries.getTerm());
100         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
101         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
102         assertEquals("Entries size", 0, appendEntries.getEntries().size());
103         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
104
105         // The follower would normally reply - simulate that explicitly here.
106         leader.handleMessage(followerActor, new AppendEntriesReply(
107                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
108         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
109
110         followerActor.underlyingActor().clear();
111
112         // Sleep for the heartbeat interval so AppendEntries is sent.
113         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
114                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
115
116         leader.handleMessage(leaderActor, new SendHeartBeat());
117
118         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
119         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
120         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
121         assertEquals("Entries size", 1, appendEntries.getEntries().size());
122         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
123         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
124         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
125     }
126
127
128     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
129         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
130         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
131                 1, index, payload);
132         actorContext.getReplicatedLog().append(newEntry);
133         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
134     }
135
136     @Test
137     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
138         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
139
140         MockRaftActorContext actorContext = createActorContextWithFollower();
141
142         long term = 1;
143         actorContext.getTermInformation().update(term, "");
144
145         leader = new Leader(actorContext);
146
147         // Leader will send an immediate heartbeat - ignore it.
148         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
149
150         // The follower would normally reply - simulate that explicitly here.
151         long lastIndex = actorContext.getReplicatedLog().lastIndex();
152         leader.handleMessage(followerActor, new AppendEntriesReply(
153                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
154         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
155
156         followerActor.underlyingActor().clear();
157
158         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
159         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
160                 1, lastIndex + 1, payload);
161         actorContext.getReplicatedLog().append(newEntry);
162         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
163
164         // State should not change
165         assertTrue(raftBehavior instanceof Leader);
166
167         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
168         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
169         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
170         assertEquals("Entries size", 1, appendEntries.getEntries().size());
171         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
172         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
173         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
174     }
175
176     @Test
177     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
178         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179
180         MockRaftActorContext actorContext = createActorContextWithFollower();
181         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
182             @Override
183             public FiniteDuration getHeartBeatInterval() {
184                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
185             }
186         });
187
188         long term = 1;
189         actorContext.getTermInformation().update(term, "");
190
191         leader = new Leader(actorContext);
192
193         // Leader will send an immediate heartbeat - ignore it.
194         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
195
196         // The follower would normally reply - simulate that explicitly here.
197         long lastIndex = actorContext.getReplicatedLog().lastIndex();
198         leader.handleMessage(followerActor, new AppendEntriesReply(
199                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
200         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
201
202         followerActor.underlyingActor().clear();
203
204         for(int i=0;i<5;i++) {
205             sendReplicate(actorContext, lastIndex+i+1);
206         }
207
208         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
209         // We expect only 1 message to be sent because of two reasons,
210         // - an append entries reply was not received
211         // - the heartbeat interval has not expired
212         // In this scenario if multiple messages are sent they would likely be duplicates
213         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
214     }
215
216     @Test
217     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
218         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
219
220         MockRaftActorContext actorContext = createActorContextWithFollower();
221         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
222             @Override
223             public FiniteDuration getHeartBeatInterval() {
224                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
225             }
226         });
227
228         long term = 1;
229         actorContext.getTermInformation().update(term, "");
230
231         leader = new Leader(actorContext);
232
233         // Leader will send an immediate heartbeat - ignore it.
234         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
235
236         // The follower would normally reply - simulate that explicitly here.
237         long lastIndex = actorContext.getReplicatedLog().lastIndex();
238         leader.handleMessage(followerActor, new AppendEntriesReply(
239                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
240         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
241
242         followerActor.underlyingActor().clear();
243
244         for(int i=0;i<3;i++) {
245             sendReplicate(actorContext, lastIndex+i+1);
246             leader.handleMessage(followerActor, new AppendEntriesReply(
247                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
248
249         }
250
251         for(int i=3;i<5;i++) {
252             sendReplicate(actorContext, lastIndex + i + 1);
253         }
254
255         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
256         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
257         // get sent to the follower - but not the 5th
258         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
259
260         for(int i=0;i<4;i++) {
261             long expected = allMessages.get(i).getEntries().get(0).getIndex();
262             assertEquals(expected, i+2);
263         }
264     }
265
266     @Test
267     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
268         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
269
270         MockRaftActorContext actorContext = createActorContextWithFollower();
271         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
272             @Override
273             public FiniteDuration getHeartBeatInterval() {
274                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
275             }
276         });
277
278         long term = 1;
279         actorContext.getTermInformation().update(term, "");
280
281         leader = new Leader(actorContext);
282
283         // Leader will send an immediate heartbeat - ignore it.
284         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
285
286         // The follower would normally reply - simulate that explicitly here.
287         long lastIndex = actorContext.getReplicatedLog().lastIndex();
288         leader.handleMessage(followerActor, new AppendEntriesReply(
289                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
290         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
291
292         followerActor.underlyingActor().clear();
293
294         sendReplicate(actorContext, lastIndex+1);
295
296         // Wait slightly longer than heartbeat duration
297         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
298
299         leader.handleMessage(leaderActor, new SendHeartBeat());
300
301         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
302         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
303
304         assertEquals(1, allMessages.get(0).getEntries().size());
305         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
306         assertEquals(1, allMessages.get(1).getEntries().size());
307         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
308
309     }
310
311     @Test
312     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
313         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
314
315         MockRaftActorContext actorContext = createActorContextWithFollower();
316         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
317             @Override
318             public FiniteDuration getHeartBeatInterval() {
319                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
320             }
321         });
322
323         long term = 1;
324         actorContext.getTermInformation().update(term, "");
325
326         leader = new Leader(actorContext);
327
328         // Leader will send an immediate heartbeat - ignore it.
329         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
330
331         // The follower would normally reply - simulate that explicitly here.
332         long lastIndex = actorContext.getReplicatedLog().lastIndex();
333         leader.handleMessage(followerActor, new AppendEntriesReply(
334                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
335         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
336
337         followerActor.underlyingActor().clear();
338
339         for(int i=0;i<3;i++) {
340             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
341             leader.handleMessage(leaderActor, new SendHeartBeat());
342         }
343
344         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
345         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
346     }
347
348     @Test
349     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
350         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
351
352         MockRaftActorContext actorContext = createActorContextWithFollower();
353         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
354             @Override
355             public FiniteDuration getHeartBeatInterval() {
356                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
357             }
358         });
359
360         long term = 1;
361         actorContext.getTermInformation().update(term, "");
362
363         leader = new Leader(actorContext);
364
365         // Leader will send an immediate heartbeat - ignore it.
366         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
367
368         // The follower would normally reply - simulate that explicitly here.
369         long lastIndex = actorContext.getReplicatedLog().lastIndex();
370         leader.handleMessage(followerActor, new AppendEntriesReply(
371                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
372         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
373
374         followerActor.underlyingActor().clear();
375
376         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
377         leader.handleMessage(leaderActor, new SendHeartBeat());
378         sendReplicate(actorContext, lastIndex+1);
379
380         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
381         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
382
383         assertEquals(0, allMessages.get(0).getEntries().size());
384         assertEquals(1, allMessages.get(1).getEntries().size());
385     }
386
387
388     @Test
389     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
390         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
391
392         MockRaftActorContext actorContext = createActorContext();
393
394         leader = new Leader(actorContext);
395
396         actorContext.setLastApplied(0);
397
398         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
399         long term = actorContext.getTermInformation().getCurrentTerm();
400         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
401                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
402
403         actorContext.getReplicatedLog().append(newEntry);
404
405         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
406                 new Replicate(leaderActor, "state-id", newEntry));
407
408         // State should not change
409         assertTrue(raftBehavior instanceof Leader);
410
411         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
412
413         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
414         // one since lastApplied state is 0.
415         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
416                 leaderActor, ApplyState.class);
417         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
418
419         for(int i = 0; i <= newLogIndex - 1; i++ ) {
420             ApplyState applyState = applyStateList.get(i);
421             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
422             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
423         }
424
425         ApplyState last = applyStateList.get((int) newLogIndex - 1);
426         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
427         assertEquals("getIdentifier", "state-id", last.getIdentifier());
428     }
429
430     @Test
431     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
432         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
433
434         MockRaftActorContext actorContext = createActorContextWithFollower();
435
436         Map<String, String> leadersSnapshot = new HashMap<>();
437         leadersSnapshot.put("1", "A");
438         leadersSnapshot.put("2", "B");
439         leadersSnapshot.put("3", "C");
440
441         //clears leaders log
442         actorContext.getReplicatedLog().removeFrom(0);
443
444         final int followersLastIndex = 2;
445         final int snapshotIndex = 3;
446         final int newEntryIndex = 4;
447         final int snapshotTerm = 1;
448         final int currentTerm = 2;
449
450         // set the snapshot variables in replicatedlog
451         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
452         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
453         actorContext.setCommitIndex(followersLastIndex);
454         //set follower timeout to 2 mins, helps during debugging
455         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
456
457         leader = new Leader(actorContext);
458
459         // new entry
460         ReplicatedLogImplEntry entry =
461                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
462                         new MockRaftActorContext.MockPayload("D"));
463
464         //update follower timestamp
465         leader.markFollowerActive(FOLLOWER_ID);
466
467         ByteString bs = toByteString(leadersSnapshot);
468         leader.setSnapshot(Optional.of(bs));
469         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
470         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
471
472         //send first chunk and no InstallSnapshotReply received yet
473         fts.getNextChunk();
474         fts.incrementChunkIndex();
475
476         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
477                 TimeUnit.MILLISECONDS);
478
479         leader.handleMessage(leaderActor, new SendHeartBeat());
480
481         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
482
483         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
484
485         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
486
487         //InstallSnapshotReply received
488         fts.markSendStatus(true);
489
490         leader.handleMessage(leaderActor, new SendHeartBeat());
491
492         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
493
494         assertEquals(snapshotIndex, is.getLastIncludedIndex());
495     }
496
497     @Test
498     public void testSendAppendEntriesSnapshotScenario() throws Exception {
499         logStart("testSendAppendEntriesSnapshotScenario");
500
501         MockRaftActorContext actorContext = createActorContextWithFollower();
502
503         Map<String, String> leadersSnapshot = new HashMap<>();
504         leadersSnapshot.put("1", "A");
505         leadersSnapshot.put("2", "B");
506         leadersSnapshot.put("3", "C");
507
508         //clears leaders log
509         actorContext.getReplicatedLog().removeFrom(0);
510
511         final int followersLastIndex = 2;
512         final int snapshotIndex = 3;
513         final int newEntryIndex = 4;
514         final int snapshotTerm = 1;
515         final int currentTerm = 2;
516
517         // set the snapshot variables in replicatedlog
518         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
519         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
520         actorContext.setCommitIndex(followersLastIndex);
521
522         leader = new Leader(actorContext);
523
524         // Leader will send an immediate heartbeat - ignore it.
525         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
526
527         // new entry
528         ReplicatedLogImplEntry entry =
529                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
530                         new MockRaftActorContext.MockPayload("D"));
531
532         actorContext.getReplicatedLog().append(entry);
533
534         //update follower timestamp
535         leader.markFollowerActive(FOLLOWER_ID);
536
537         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
538         RaftActorBehavior raftBehavior = leader.handleMessage(
539                 leaderActor, new Replicate(null, "state-id", entry));
540
541         assertTrue(raftBehavior instanceof Leader);
542
543         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
544     }
545
546     @Test
547     public void testInitiateInstallSnapshot() throws Exception {
548         logStart("testInitiateInstallSnapshot");
549
550         MockRaftActorContext actorContext = createActorContextWithFollower();
551
552         Map<String, String> leadersSnapshot = new HashMap<>();
553         leadersSnapshot.put("1", "A");
554         leadersSnapshot.put("2", "B");
555         leadersSnapshot.put("3", "C");
556
557         //clears leaders log
558         actorContext.getReplicatedLog().removeFrom(0);
559
560         final int followersLastIndex = 2;
561         final int snapshotIndex = 3;
562         final int newEntryIndex = 4;
563         final int snapshotTerm = 1;
564         final int currentTerm = 2;
565
566         // set the snapshot variables in replicatedlog
567         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
568         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
569         actorContext.setLastApplied(3);
570         actorContext.setCommitIndex(followersLastIndex);
571
572         leader = new Leader(actorContext);
573
574         // Leader will send an immediate heartbeat - ignore it.
575         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
576
577         // set the snapshot as absent and check if capture-snapshot is invoked.
578         leader.setSnapshot(Optional.<ByteString>absent());
579
580         // new entry
581         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
582                 new MockRaftActorContext.MockPayload("D"));
583
584         actorContext.getReplicatedLog().append(entry);
585
586         //update follower timestamp
587         leader.markFollowerActive(FOLLOWER_ID);
588
589         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
590
591         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
592
593         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
594
595         assertTrue(cs.isInstallSnapshotInitiated());
596         assertEquals(3, cs.getLastAppliedIndex());
597         assertEquals(1, cs.getLastAppliedTerm());
598         assertEquals(4, cs.getLastIndex());
599         assertEquals(2, cs.getLastTerm());
600
601         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
602         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
603
604         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
605     }
606
607     @Test
608     public void testInstallSnapshot() throws Exception {
609         logStart("testInstallSnapshot");
610
611         MockRaftActorContext actorContext = createActorContextWithFollower();
612
613         Map<String, String> leadersSnapshot = new HashMap<>();
614         leadersSnapshot.put("1", "A");
615         leadersSnapshot.put("2", "B");
616         leadersSnapshot.put("3", "C");
617
618         //clears leaders log
619         actorContext.getReplicatedLog().removeFrom(0);
620
621         final int followersLastIndex = 2;
622         final int snapshotIndex = 3;
623         final int snapshotTerm = 1;
624         final int currentTerm = 2;
625
626         // set the snapshot variables in replicatedlog
627         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
628         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
629         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
630         actorContext.setCommitIndex(followersLastIndex);
631
632         leader = new Leader(actorContext);
633
634         // Ignore initial heartbeat.
635         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
636
637         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
638                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
639
640         assertTrue(raftBehavior instanceof Leader);
641
642         // check if installsnapshot gets called with the correct values.
643
644         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
645
646         assertNotNull(installSnapshot.getData());
647         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
648         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
649
650         assertEquals(currentTerm, installSnapshot.getTerm());
651     }
652
653     @Test
654     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
655         logStart("testHandleInstallSnapshotReplyLastChunk");
656
657         MockRaftActorContext actorContext = createActorContextWithFollower();
658
659         final int followersLastIndex = 2;
660         final int snapshotIndex = 3;
661         final int snapshotTerm = 1;
662         final int currentTerm = 2;
663
664         actorContext.setCommitIndex(followersLastIndex);
665
666         leader = new Leader(actorContext);
667
668         // Ignore initial heartbeat.
669         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
670
671         Map<String, String> leadersSnapshot = new HashMap<>();
672         leadersSnapshot.put("1", "A");
673         leadersSnapshot.put("2", "B");
674         leadersSnapshot.put("3", "C");
675
676         // set the snapshot variables in replicatedlog
677
678         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
679         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
680         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
681
682         ByteString bs = toByteString(leadersSnapshot);
683         leader.setSnapshot(Optional.of(bs));
684         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
685         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
686         while(!fts.isLastChunk(fts.getChunkIndex())) {
687             fts.getNextChunk();
688             fts.incrementChunkIndex();
689         }
690
691         //clears leaders log
692         actorContext.getReplicatedLog().removeFrom(0);
693
694         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
695                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
696
697         assertTrue(raftBehavior instanceof Leader);
698
699         assertEquals(0, leader.followerSnapshotSize());
700         assertEquals(1, leader.followerLogSize());
701         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
702         assertNotNull(fli);
703         assertEquals(snapshotIndex, fli.getMatchIndex());
704         assertEquals(snapshotIndex, fli.getMatchIndex());
705         assertEquals(snapshotIndex + 1, fli.getNextIndex());
706     }
707
708     @Test
709     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
710         logStart("testSendSnapshotfromInstallSnapshotReply");
711
712         MockRaftActorContext actorContext = createActorContextWithFollower();
713
714         final int followersLastIndex = 2;
715         final int snapshotIndex = 3;
716         final int snapshotTerm = 1;
717         final int currentTerm = 2;
718
719         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
720             @Override
721             public int getSnapshotChunkSize() {
722                 return 50;
723             }
724         };
725         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
726         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
727
728         actorContext.setConfigParams(configParams);
729         actorContext.setCommitIndex(followersLastIndex);
730
731         leader = new Leader(actorContext);
732
733         Map<String, String> leadersSnapshot = new HashMap<>();
734         leadersSnapshot.put("1", "A");
735         leadersSnapshot.put("2", "B");
736         leadersSnapshot.put("3", "C");
737
738         // set the snapshot variables in replicatedlog
739         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
740         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
741         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
742
743         ByteString bs = toByteString(leadersSnapshot);
744         leader.setSnapshot(Optional.of(bs));
745
746         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
747
748         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
749
750         assertEquals(1, installSnapshot.getChunkIndex());
751         assertEquals(3, installSnapshot.getTotalChunks());
752
753         followerActor.underlyingActor().clear();
754         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
755                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
756
757         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
758
759         assertEquals(2, installSnapshot.getChunkIndex());
760         assertEquals(3, installSnapshot.getTotalChunks());
761
762         followerActor.underlyingActor().clear();
763         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
764                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
765
766         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
767
768         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
769         followerActor.underlyingActor().clear();
770         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
771                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
772
773         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
774
775         Assert.assertNull(installSnapshot);
776     }
777
778
779     @Test
780     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
781         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
782
783         MockRaftActorContext actorContext = createActorContextWithFollower();
784
785         final int followersLastIndex = 2;
786         final int snapshotIndex = 3;
787         final int snapshotTerm = 1;
788         final int currentTerm = 2;
789
790         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
791             @Override
792             public int getSnapshotChunkSize() {
793                 return 50;
794             }
795         });
796
797         actorContext.setCommitIndex(followersLastIndex);
798
799         leader = new Leader(actorContext);
800
801         Map<String, String> leadersSnapshot = new HashMap<>();
802         leadersSnapshot.put("1", "A");
803         leadersSnapshot.put("2", "B");
804         leadersSnapshot.put("3", "C");
805
806         // set the snapshot variables in replicatedlog
807         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
808         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
809         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
810
811         ByteString bs = toByteString(leadersSnapshot);
812         leader.setSnapshot(Optional.of(bs));
813
814         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
815         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
816
817         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
818
819         assertEquals(1, installSnapshot.getChunkIndex());
820         assertEquals(3, installSnapshot.getTotalChunks());
821
822         followerActor.underlyingActor().clear();
823
824         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
825                 FOLLOWER_ID, -1, false));
826
827         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
828                 TimeUnit.MILLISECONDS);
829
830         leader.handleMessage(leaderActor, new SendHeartBeat());
831
832         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
833
834         assertEquals(1, installSnapshot.getChunkIndex());
835         assertEquals(3, installSnapshot.getTotalChunks());
836     }
837
838     @Test
839     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
840         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
841
842         MockRaftActorContext actorContext = createActorContextWithFollower();
843
844         final int followersLastIndex = 2;
845         final int snapshotIndex = 3;
846         final int snapshotTerm = 1;
847         final int currentTerm = 2;
848
849         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
850             @Override
851             public int getSnapshotChunkSize() {
852                 return 50;
853             }
854         });
855
856         actorContext.setCommitIndex(followersLastIndex);
857
858         leader = new Leader(actorContext);
859
860         Map<String, String> leadersSnapshot = new HashMap<>();
861         leadersSnapshot.put("1", "A");
862         leadersSnapshot.put("2", "B");
863         leadersSnapshot.put("3", "C");
864
865         // set the snapshot variables in replicatedlog
866         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
867         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
868         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
869
870         ByteString bs = toByteString(leadersSnapshot);
871         leader.setSnapshot(Optional.of(bs));
872
873         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
874
875         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
876
877         assertEquals(1, installSnapshot.getChunkIndex());
878         assertEquals(3, installSnapshot.getTotalChunks());
879         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
880
881         int hashCode = installSnapshot.getData().hashCode();
882
883         followerActor.underlyingActor().clear();
884
885         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
886                 FOLLOWER_ID, 1, true));
887
888         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
889
890         assertEquals(2, installSnapshot.getChunkIndex());
891         assertEquals(3, installSnapshot.getTotalChunks());
892         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
893     }
894
895     @Test
896     public void testFollowerToSnapshotLogic() {
897         logStart("testFollowerToSnapshotLogic");
898
899         MockRaftActorContext actorContext = createActorContext();
900
901         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
902             @Override
903             public int getSnapshotChunkSize() {
904                 return 50;
905             }
906         });
907
908         leader = new Leader(actorContext);
909
910         Map<String, String> leadersSnapshot = new HashMap<>();
911         leadersSnapshot.put("1", "A");
912         leadersSnapshot.put("2", "B");
913         leadersSnapshot.put("3", "C");
914
915         ByteString bs = toByteString(leadersSnapshot);
916         byte[] barray = bs.toByteArray();
917
918         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
919         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
920
921         assertEquals(bs.size(), barray.length);
922
923         int chunkIndex=0;
924         for (int i=0; i < barray.length; i = i + 50) {
925             int j = i + 50;
926             chunkIndex++;
927
928             if (i + 50 > barray.length) {
929                 j = barray.length;
930             }
931
932             ByteString chunk = fts.getNextChunk();
933             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
934             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
935
936             fts.markSendStatus(true);
937             if (!fts.isLastChunk(chunkIndex)) {
938                 fts.incrementChunkIndex();
939             }
940         }
941
942         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
943     }
944
945     @Override protected RaftActorBehavior createBehavior(
946         RaftActorContext actorContext) {
947         return new Leader(actorContext);
948     }
949
950     @Override
951     protected MockRaftActorContext createActorContext() {
952         return createActorContext(leaderActor);
953     }
954
955     @Override
956     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
957         return createActorContext(LEADER_ID, actorRef);
958     }
959
960     private MockRaftActorContext createActorContextWithFollower() {
961         MockRaftActorContext actorContext = createActorContext();
962         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
963                 followerActor.path().toString()).build());
964         return actorContext;
965     }
966
967     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
968         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
969         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
970         configParams.setElectionTimeoutFactor(100000);
971         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
972         context.setConfigParams(configParams);
973         return context;
974     }
975
976     @Test
977     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
978         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
979
980         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
981
982         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
983
984         Follower follower = new Follower(followerActorContext);
985         followerActor.underlyingActor().setBehavior(follower);
986
987         Map<String, String> peerAddresses = new HashMap<>();
988         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
989
990         leaderActorContext.setPeerAddresses(peerAddresses);
991
992         leaderActorContext.getReplicatedLog().removeFrom(0);
993
994         //create 3 entries
995         leaderActorContext.setReplicatedLog(
996                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
997
998         leaderActorContext.setCommitIndex(1);
999
1000         followerActorContext.getReplicatedLog().removeFrom(0);
1001
1002         // follower too has the exact same log entries and has the same commit index
1003         followerActorContext.setReplicatedLog(
1004                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1005
1006         followerActorContext.setCommitIndex(1);
1007
1008         leader = new Leader(leaderActorContext);
1009
1010         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1011
1012         assertEquals(1, appendEntries.getLeaderCommit());
1013         assertEquals(0, appendEntries.getEntries().size());
1014         assertEquals(0, appendEntries.getPrevLogIndex());
1015
1016         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1017                 leaderActor, AppendEntriesReply.class);
1018
1019         assertEquals(2, appendEntriesReply.getLogLastIndex());
1020         assertEquals(1, appendEntriesReply.getLogLastTerm());
1021
1022         // follower returns its next index
1023         assertEquals(2, appendEntriesReply.getLogLastIndex());
1024         assertEquals(1, appendEntriesReply.getLogLastTerm());
1025
1026         follower.close();
1027     }
1028
1029     @Test
1030     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
1031         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1032
1033         MockRaftActorContext leaderActorContext = createActorContext();
1034
1035         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1036         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1037
1038         Follower follower = new Follower(followerActorContext);
1039         followerActor.underlyingActor().setBehavior(follower);
1040
1041         Map<String, String> leaderPeerAddresses = new HashMap<>();
1042         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1043
1044         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1045
1046         leaderActorContext.getReplicatedLog().removeFrom(0);
1047
1048         leaderActorContext.setReplicatedLog(
1049                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1050
1051         leaderActorContext.setCommitIndex(1);
1052
1053         followerActorContext.getReplicatedLog().removeFrom(0);
1054
1055         followerActorContext.setReplicatedLog(
1056                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1057
1058         // follower has the same log entries but its commit index > leaders commit index
1059         followerActorContext.setCommitIndex(2);
1060
1061         leader = new Leader(leaderActorContext);
1062
1063         // Initial heartbeat
1064         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1065
1066         assertEquals(1, appendEntries.getLeaderCommit());
1067         assertEquals(0, appendEntries.getEntries().size());
1068         assertEquals(0, appendEntries.getPrevLogIndex());
1069
1070         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1071                 leaderActor, AppendEntriesReply.class);
1072
1073         assertEquals(2, appendEntriesReply.getLogLastIndex());
1074         assertEquals(1, appendEntriesReply.getLogLastTerm());
1075
1076         leaderActor.underlyingActor().setBehavior(follower);
1077         leader.handleMessage(followerActor, appendEntriesReply);
1078
1079         leaderActor.underlyingActor().clear();
1080         followerActor.underlyingActor().clear();
1081
1082         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1083                 TimeUnit.MILLISECONDS);
1084
1085         leader.handleMessage(leaderActor, new SendHeartBeat());
1086
1087         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1088
1089         assertEquals(2, appendEntries.getLeaderCommit());
1090         assertEquals(0, appendEntries.getEntries().size());
1091         assertEquals(2, appendEntries.getPrevLogIndex());
1092
1093         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1094
1095         assertEquals(2, appendEntriesReply.getLogLastIndex());
1096         assertEquals(1, appendEntriesReply.getLogLastTerm());
1097
1098         assertEquals(2, followerActorContext.getCommitIndex());
1099
1100         follower.close();
1101     }
1102
1103     @Test
1104     public void testHandleAppendEntriesReplyFailure(){
1105         logStart("testHandleAppendEntriesReplyFailure");
1106
1107         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1108
1109         leader = new Leader(leaderActorContext);
1110
1111         // Send initial heartbeat reply with last index.
1112         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
1113
1114         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1115         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
1116
1117         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
1118
1119         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1120
1121         assertEquals(RaftState.Leader, raftActorBehavior.state());
1122
1123         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
1124     }
1125
1126     @Test
1127     public void testHandleAppendEntriesReplySuccess() throws Exception {
1128         logStart("testHandleAppendEntriesReplySuccess");
1129
1130         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1131
1132         leaderActorContext.setReplicatedLog(
1133                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1134
1135         leaderActorContext.setCommitIndex(1);
1136         leaderActorContext.setLastApplied(1);
1137         leaderActorContext.getTermInformation().update(1, "leader");
1138
1139         leader = new Leader(leaderActorContext);
1140
1141         short payloadVersion = 5;
1142         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1143
1144         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1145
1146         assertEquals(RaftState.Leader, raftActorBehavior.state());
1147
1148         assertEquals(2, leaderActorContext.getCommitIndex());
1149
1150         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1151                 leaderActor, ApplyJournalEntries.class);
1152
1153         assertEquals(2, leaderActorContext.getLastApplied());
1154
1155         assertEquals(2, applyJournalEntries.getToIndex());
1156
1157         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1158                 ApplyState.class);
1159
1160         assertEquals(1,applyStateList.size());
1161
1162         ApplyState applyState = applyStateList.get(0);
1163
1164         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1165
1166         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1167         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1168     }
1169
1170     @Test
1171     public void testHandleAppendEntriesReplyUnknownFollower(){
1172         logStart("testHandleAppendEntriesReplyUnknownFollower");
1173
1174         MockRaftActorContext leaderActorContext = createActorContext();
1175
1176         leader = new Leader(leaderActorContext);
1177
1178         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1179
1180         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1181
1182         assertEquals(RaftState.Leader, raftActorBehavior.state());
1183     }
1184
1185     @Test
1186     public void testHandleRequestVoteReply(){
1187         logStart("testHandleRequestVoteReply");
1188
1189         MockRaftActorContext leaderActorContext = createActorContext();
1190
1191         leader = new Leader(leaderActorContext);
1192
1193         // Should be a no-op.
1194         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1195                 new RequestVoteReply(1, true));
1196
1197         assertEquals(RaftState.Leader, raftActorBehavior.state());
1198
1199         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1200
1201         assertEquals(RaftState.Leader, raftActorBehavior.state());
1202     }
1203
1204     @Test
1205     public void testIsolatedLeaderCheckNoFollowers() {
1206         logStart("testIsolatedLeaderCheckNoFollowers");
1207
1208         MockRaftActorContext leaderActorContext = createActorContext();
1209
1210         leader = new Leader(leaderActorContext);
1211         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1212         Assert.assertTrue(behavior instanceof Leader);
1213     }
1214
1215     @Test
1216     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1217         logStart("testIsolatedLeaderCheckTwoFollowers");
1218
1219         new JavaTestKit(getSystem()) {{
1220
1221             ActorRef followerActor1 = getTestActor();
1222             ActorRef followerActor2 = getTestActor();
1223
1224             MockRaftActorContext leaderActorContext = createActorContext();
1225
1226             Map<String, String> peerAddresses = new HashMap<>();
1227             peerAddresses.put("follower-1", followerActor1.path().toString());
1228             peerAddresses.put("follower-2", followerActor2.path().toString());
1229
1230             leaderActorContext.setPeerAddresses(peerAddresses);
1231
1232             leader = new Leader(leaderActorContext);
1233
1234             leader.markFollowerActive("follower-1");
1235             leader.markFollowerActive("follower-2");
1236             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1237             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1238                 behavior instanceof Leader);
1239
1240             // kill 1 follower and verify if that got killed
1241             final JavaTestKit probe = new JavaTestKit(getSystem());
1242             probe.watch(followerActor1);
1243             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1244             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1245             assertEquals(termMsg1.getActor(), followerActor1);
1246
1247             leader.markFollowerInActive("follower-1");
1248             leader.markFollowerActive("follower-2");
1249             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1250             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1251                 behavior instanceof Leader);
1252
1253             // kill 2nd follower and leader should change to Isolated leader
1254             followerActor2.tell(PoisonPill.getInstance(), null);
1255             probe.watch(followerActor2);
1256             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1257             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1258             assertEquals(termMsg2.getActor(), followerActor2);
1259
1260             leader.markFollowerInActive("follower-2");
1261             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1262             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1263                 behavior instanceof IsolatedLeader);
1264         }};
1265     }
1266
1267
1268     @Test
1269     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1270         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1271
1272         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1273
1274         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1275         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1276         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1277
1278         leaderActorContext.setConfigParams(configParams);
1279
1280         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1281
1282         followerActorContext.setConfigParams(configParams);
1283         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1284
1285         Follower follower = new Follower(followerActorContext);
1286         followerActor.underlyingActor().setBehavior(follower);
1287
1288         leaderActorContext.getReplicatedLog().removeFrom(0);
1289         leaderActorContext.setCommitIndex(-1);
1290         leaderActorContext.setLastApplied(-1);
1291
1292         followerActorContext.getReplicatedLog().removeFrom(0);
1293         followerActorContext.setCommitIndex(-1);
1294         followerActorContext.setLastApplied(-1);
1295
1296         leader = new Leader(leaderActorContext);
1297
1298         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1299                 leaderActor, AppendEntriesReply.class);
1300
1301         leader.handleMessage(followerActor, appendEntriesReply);
1302
1303         // Clear initial heartbeat messages
1304
1305         leaderActor.underlyingActor().clear();
1306         followerActor.underlyingActor().clear();
1307
1308         // create 3 entries
1309         leaderActorContext.setReplicatedLog(
1310                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1311         leaderActorContext.setCommitIndex(1);
1312         leaderActorContext.setLastApplied(1);
1313
1314         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1315                 TimeUnit.MILLISECONDS);
1316
1317         leader.handleMessage(leaderActor, new SendHeartBeat());
1318
1319         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1320
1321         // Should send first log entry
1322         assertEquals(1, appendEntries.getLeaderCommit());
1323         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1324         assertEquals(-1, appendEntries.getPrevLogIndex());
1325
1326         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1327
1328         assertEquals(1, appendEntriesReply.getLogLastTerm());
1329         assertEquals(0, appendEntriesReply.getLogLastIndex());
1330
1331         followerActor.underlyingActor().clear();
1332
1333         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1334
1335         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1336
1337         // Should send second log entry
1338         assertEquals(1, appendEntries.getLeaderCommit());
1339         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1340
1341         follower.close();
1342     }
1343
1344     @Test
1345     public void testLaggingFollowerStarvation() throws Exception {
1346         logStart("testLaggingFollowerStarvation");
1347         new JavaTestKit(getSystem()) {{
1348             String leaderActorId = actorFactory.generateActorId("leader");
1349             String follower1ActorId = actorFactory.generateActorId("follower");
1350             String follower2ActorId = actorFactory.generateActorId("follower");
1351
1352             TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
1353                     actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
1354             ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1355             ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1356
1357             MockRaftActorContext leaderActorContext =
1358                     new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1359
1360             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1361             configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1362             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1363
1364             leaderActorContext.setConfigParams(configParams);
1365
1366             leaderActorContext.setReplicatedLog(
1367                     new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1368
1369             Map<String, String> peerAddresses = new HashMap<>();
1370             peerAddresses.put(follower1ActorId,
1371                     follower1Actor.path().toString());
1372             peerAddresses.put(follower2ActorId,
1373                     follower2Actor.path().toString());
1374
1375             leaderActorContext.setPeerAddresses(peerAddresses);
1376             leaderActorContext.getTermInformation().update(1, leaderActorId);
1377
1378             RaftActorBehavior leader = createBehavior(leaderActorContext);
1379
1380             leaderActor.underlyingActor().setBehavior(leader);
1381
1382             for(int i=1;i<6;i++) {
1383                 // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
1384                 RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
1385                 assertTrue(newBehavior == leader);
1386                 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
1387             }
1388
1389             // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
1390             List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
1391
1392             assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
1393                     heartbeats.size() > 1);
1394
1395             // Check if follower-2 got AppendEntries during this time and was not starved
1396             List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
1397
1398             assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
1399                     appendEntries.size() > 1);
1400
1401         }};
1402     }
1403
1404     @Override
1405     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1406             ActorRef actorRef, RaftRPC rpc) throws Exception {
1407         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1408         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1409     }
1410
1411     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1412
1413         private final long electionTimeOutIntervalMillis;
1414         private final int snapshotChunkSize;
1415
1416         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1417             super();
1418             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1419             this.snapshotChunkSize = snapshotChunkSize;
1420         }
1421
1422         @Override
1423         public FiniteDuration getElectionTimeOutInterval() {
1424             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1425         }
1426
1427         @Override
1428         public int getSnapshotChunkSize() {
1429             return snapshotChunkSize;
1430         }
1431     }
1432 }