BUG 2702 : Switch to a low impact scheduling mechanism for IsolatedLeaderCheck
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 package org.opendaylight.controller.cluster.raft.behaviors;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import akka.actor.ActorRef;
7 import akka.actor.PoisonPill;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Uninterruptibles;
15 import com.google.protobuf.ByteString;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.After;
21 import org.junit.Assert;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
24 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
25 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
26 import org.opendaylight.controller.cluster.raft.RaftActorContext;
27 import org.opendaylight.controller.cluster.raft.RaftState;
28 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
29 import org.opendaylight.controller.cluster.raft.SerializationUtils;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
37 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
42 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
45 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
46 import scala.concurrent.duration.FiniteDuration;
47
48 public class LeaderTest extends AbstractRaftActorBehaviorTest {
49
50     static final String FOLLOWER_ID = "follower";
51
52     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
53             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
54
55     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
56             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
57
58     private Leader leader;
59
60     @Override
61     @After
62     public void tearDown() throws Exception {
63         if(leader != null) {
64             leader.close();
65         }
66
67         super.tearDown();
68     }
69
70     @Test
71     public void testHandleMessageForUnknownMessage() throws Exception {
72         logStart("testHandleMessageForUnknownMessage");
73
74         leader = new Leader(createActorContext());
75
76         // handle message should return the Leader state when it receives an
77         // unknown message
78         RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
79         Assert.assertTrue(behavior instanceof Leader);
80     }
81
82     @Test
83     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
84         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
85
86         MockRaftActorContext actorContext = createActorContextWithFollower();
87
88         long term = 1;
89         actorContext.getTermInformation().update(term, "");
90
91         leader = new Leader(actorContext);
92
93         // Leader should send an immediate heartbeat with no entries as follower is inactive.
94         long lastIndex = actorContext.getReplicatedLog().lastIndex();
95         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
96         assertEquals("getTerm", term, appendEntries.getTerm());
97         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
98         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
99         assertEquals("Entries size", 0, appendEntries.getEntries().size());
100
101         // The follower would normally reply - simulate that explicitly here.
102         leader.handleMessage(followerActor, new AppendEntriesReply(
103                 FOLLOWER_ID, term, true, lastIndex - 1, term));
104         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
105
106         followerActor.underlyingActor().clear();
107
108         // Sleep for the heartbeat interval so AppendEntries is sent.
109         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
110                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
111
112         leader.handleMessage(leaderActor, new SendHeartBeat());
113
114         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
115         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
116         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
117         assertEquals("Entries size", 1, appendEntries.getEntries().size());
118         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
119         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
120     }
121
122     @Test
123     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
124         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
125
126         MockRaftActorContext actorContext = createActorContextWithFollower();
127
128         long term = 1;
129         actorContext.getTermInformation().update(term, "");
130
131         leader = new Leader(actorContext);
132
133         // Leader will send an immediate heartbeat - ignore it.
134         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
135
136         // The follower would normally reply - simulate that explicitly here.
137         long lastIndex = actorContext.getReplicatedLog().lastIndex();
138         leader.handleMessage(followerActor, new AppendEntriesReply(
139                 FOLLOWER_ID, term, true, lastIndex, term));
140         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
141
142         followerActor.underlyingActor().clear();
143
144         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
145         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
146                 1, lastIndex + 1, payload);
147         actorContext.getReplicatedLog().append(newEntry);
148         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
149                 new Replicate(null, null, newEntry));
150
151         // State should not change
152         assertTrue(raftBehavior instanceof Leader);
153
154         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
155         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
156         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
157         assertEquals("Entries size", 1, appendEntries.getEntries().size());
158         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
159         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
160         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
161     }
162
163     @Test
164     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
165         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
166
167         MockRaftActorContext actorContext = createActorContext();
168
169         leader = new Leader(actorContext);
170
171         actorContext.setLastApplied(0);
172
173         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
174         long term = actorContext.getTermInformation().getCurrentTerm();
175         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
176                 term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
177
178         actorContext.getReplicatedLog().append(newEntry);
179
180         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
181                 new Replicate(leaderActor, "state-id", newEntry));
182
183         // State should not change
184         assertTrue(raftBehavior instanceof Leader);
185
186         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
187
188         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
189         // one since lastApplied state is 0.
190         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
191                 leaderActor, ApplyState.class);
192         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
193
194         for(int i = 0; i <= newLogIndex - 1; i++ ) {
195             ApplyState applyState = applyStateList.get(i);
196             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
197             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
198         }
199
200         ApplyState last = applyStateList.get((int) newLogIndex - 1);
201         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
202         assertEquals("getIdentifier", "state-id", last.getIdentifier());
203     }
204
205     @Test
206     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
207         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
208
209         MockRaftActorContext actorContext = createActorContextWithFollower();
210
211         Map<String, String> leadersSnapshot = new HashMap<>();
212         leadersSnapshot.put("1", "A");
213         leadersSnapshot.put("2", "B");
214         leadersSnapshot.put("3", "C");
215
216         //clears leaders log
217         actorContext.getReplicatedLog().removeFrom(0);
218
219         final int followersLastIndex = 2;
220         final int snapshotIndex = 3;
221         final int newEntryIndex = 4;
222         final int snapshotTerm = 1;
223         final int currentTerm = 2;
224
225         // set the snapshot variables in replicatedlog
226         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
227         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
228         actorContext.setCommitIndex(followersLastIndex);
229         //set follower timeout to 2 mins, helps during debugging
230         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
231
232         leader = new Leader(actorContext);
233
234         // new entry
235         ReplicatedLogImplEntry entry =
236                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
237                         new MockRaftActorContext.MockPayload("D"));
238
239         //update follower timestamp
240         leader.markFollowerActive(FOLLOWER_ID);
241
242         ByteString bs = toByteString(leadersSnapshot);
243         leader.setSnapshot(Optional.of(bs));
244         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
245         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
246
247         //send first chunk and no InstallSnapshotReply received yet
248         fts.getNextChunk();
249         fts.incrementChunkIndex();
250
251         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
252                 TimeUnit.MILLISECONDS);
253
254         leader.handleMessage(leaderActor, new SendHeartBeat());
255
256         AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
257
258         AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
259
260         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
261
262         //InstallSnapshotReply received
263         fts.markSendStatus(true);
264
265         leader.handleMessage(leaderActor, new SendHeartBeat());
266
267         InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
268                 InstallSnapshot.SERIALIZABLE_CLASS);
269
270         InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
271
272         assertEquals(snapshotIndex, is.getLastIncludedIndex());
273     }
274
275     @Test
276     public void testSendAppendEntriesSnapshotScenario() throws Exception {
277         logStart("testSendAppendEntriesSnapshotScenario");
278
279         MockRaftActorContext actorContext = createActorContextWithFollower();
280
281         Map<String, String> leadersSnapshot = new HashMap<>();
282         leadersSnapshot.put("1", "A");
283         leadersSnapshot.put("2", "B");
284         leadersSnapshot.put("3", "C");
285
286         //clears leaders log
287         actorContext.getReplicatedLog().removeFrom(0);
288
289         final int followersLastIndex = 2;
290         final int snapshotIndex = 3;
291         final int newEntryIndex = 4;
292         final int snapshotTerm = 1;
293         final int currentTerm = 2;
294
295         // set the snapshot variables in replicatedlog
296         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
297         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
298         actorContext.setCommitIndex(followersLastIndex);
299
300         leader = new Leader(actorContext);
301
302         // Leader will send an immediate heartbeat - ignore it.
303         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
304
305         // new entry
306         ReplicatedLogImplEntry entry =
307                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
308                         new MockRaftActorContext.MockPayload("D"));
309
310         //update follower timestamp
311         leader.markFollowerActive(FOLLOWER_ID);
312
313         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
314         RaftActorBehavior raftBehavior = leader.handleMessage(
315                 leaderActor, new Replicate(null, "state-id", entry));
316
317         assertTrue(raftBehavior instanceof Leader);
318
319         MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
320     }
321
322     @Test
323     public void testInitiateInstallSnapshot() throws Exception {
324         logStart("testInitiateInstallSnapshot");
325
326         MockRaftActorContext actorContext = createActorContextWithFollower();
327
328         Map<String, String> leadersSnapshot = new HashMap<>();
329         leadersSnapshot.put("1", "A");
330         leadersSnapshot.put("2", "B");
331         leadersSnapshot.put("3", "C");
332
333         //clears leaders log
334         actorContext.getReplicatedLog().removeFrom(0);
335
336         final int followersLastIndex = 2;
337         final int snapshotIndex = 3;
338         final int newEntryIndex = 4;
339         final int snapshotTerm = 1;
340         final int currentTerm = 2;
341
342         // set the snapshot variables in replicatedlog
343         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
344         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
345         actorContext.setLastApplied(3);
346         actorContext.setCommitIndex(followersLastIndex);
347
348         leader = new Leader(actorContext);
349
350         // Leader will send an immediate heartbeat - ignore it.
351         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
352
353         // set the snapshot as absent and check if capture-snapshot is invoked.
354         leader.setSnapshot(Optional.<ByteString>absent());
355
356         // new entry
357         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
358                 new MockRaftActorContext.MockPayload("D"));
359
360         actorContext.getReplicatedLog().append(entry);
361
362         //update follower timestamp
363         leader.markFollowerActive(FOLLOWER_ID);
364
365         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
366
367         CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
368
369         assertTrue(cs.isInstallSnapshotInitiated());
370         assertEquals(3, cs.getLastAppliedIndex());
371         assertEquals(1, cs.getLastAppliedTerm());
372         assertEquals(4, cs.getLastIndex());
373         assertEquals(2, cs.getLastTerm());
374
375         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
376         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
377
378         List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
379         assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
380     }
381
382     @Test
383     public void testInstallSnapshot() throws Exception {
384         logStart("testInstallSnapshot");
385
386         MockRaftActorContext actorContext = createActorContextWithFollower();
387
388         Map<String, String> leadersSnapshot = new HashMap<>();
389         leadersSnapshot.put("1", "A");
390         leadersSnapshot.put("2", "B");
391         leadersSnapshot.put("3", "C");
392
393         //clears leaders log
394         actorContext.getReplicatedLog().removeFrom(0);
395
396         final int followersLastIndex = 2;
397         final int snapshotIndex = 3;
398         final int snapshotTerm = 1;
399         final int currentTerm = 2;
400
401         // set the snapshot variables in replicatedlog
402         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
403         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
404         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
405         actorContext.setCommitIndex(followersLastIndex);
406
407         leader = new Leader(actorContext);
408
409         // Ignore initial heartbeat.
410         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
411
412         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
413                 new SendInstallSnapshot(toByteString(leadersSnapshot)));
414
415         assertTrue(raftBehavior instanceof Leader);
416
417         // check if installsnapshot gets called with the correct values.
418
419         InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
420                 MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
421
422         assertNotNull(installSnapshot.getData());
423         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
424         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
425
426         assertEquals(currentTerm, installSnapshot.getTerm());
427     }
428
429     @Test
430     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
431         logStart("testHandleInstallSnapshotReplyLastChunk");
432
433         MockRaftActorContext actorContext = createActorContextWithFollower();
434
435         final int followersLastIndex = 2;
436         final int snapshotIndex = 3;
437         final int snapshotTerm = 1;
438         final int currentTerm = 2;
439
440         actorContext.setCommitIndex(followersLastIndex);
441
442         leader = new Leader(actorContext);
443
444         // Ignore initial heartbeat.
445         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
446
447         Map<String, String> leadersSnapshot = new HashMap<>();
448         leadersSnapshot.put("1", "A");
449         leadersSnapshot.put("2", "B");
450         leadersSnapshot.put("3", "C");
451
452         // set the snapshot variables in replicatedlog
453
454         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
455         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
456         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
457
458         ByteString bs = toByteString(leadersSnapshot);
459         leader.setSnapshot(Optional.of(bs));
460         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
461         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
462         while(!fts.isLastChunk(fts.getChunkIndex())) {
463             fts.getNextChunk();
464             fts.incrementChunkIndex();
465         }
466
467         //clears leaders log
468         actorContext.getReplicatedLog().removeFrom(0);
469
470         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
471                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
472
473         assertTrue(raftBehavior instanceof Leader);
474
475         assertEquals(0, leader.followerSnapshotSize());
476         assertEquals(1, leader.followerLogSize());
477         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
478         assertNotNull(fli);
479         assertEquals(snapshotIndex, fli.getMatchIndex());
480         assertEquals(snapshotIndex, fli.getMatchIndex());
481         assertEquals(snapshotIndex + 1, fli.getNextIndex());
482     }
483
484     @Test
485     public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
486         logStart("testSendSnapshotfromInstallSnapshotReply");
487
488         MockRaftActorContext actorContext = createActorContextWithFollower();
489
490         final int followersLastIndex = 2;
491         final int snapshotIndex = 3;
492         final int snapshotTerm = 1;
493         final int currentTerm = 2;
494
495         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
496             @Override
497             public int getSnapshotChunkSize() {
498                 return 50;
499             }
500         };
501         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
502         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
503
504         actorContext.setConfigParams(configParams);
505         actorContext.setCommitIndex(followersLastIndex);
506
507         leader = new Leader(actorContext);
508
509         Map<String, String> leadersSnapshot = new HashMap<>();
510         leadersSnapshot.put("1", "A");
511         leadersSnapshot.put("2", "B");
512         leadersSnapshot.put("3", "C");
513
514         // set the snapshot variables in replicatedlog
515         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
516         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
517         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
518
519         ByteString bs = toByteString(leadersSnapshot);
520         leader.setSnapshot(Optional.of(bs));
521
522         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
523
524         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
525                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
526
527         assertEquals(1, installSnapshot.getChunkIndex());
528         assertEquals(3, installSnapshot.getTotalChunks());
529
530         followerActor.underlyingActor().clear();
531         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
532                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
533
534         installSnapshot = MessageCollectorActor.expectFirstMatching(
535                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
536
537         assertEquals(2, installSnapshot.getChunkIndex());
538         assertEquals(3, installSnapshot.getTotalChunks());
539
540         followerActor.underlyingActor().clear();
541         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
542                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
543
544         installSnapshot = MessageCollectorActor.expectFirstMatching(
545                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
546
547         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
548         followerActor.underlyingActor().clear();
549         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
550                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
551
552         installSnapshot = MessageCollectorActor.getFirstMatching(
553                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
554
555         Assert.assertNull(installSnapshot);
556     }
557
558
559     @Test
560     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
561         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
562
563         MockRaftActorContext actorContext = createActorContextWithFollower();
564
565         final int followersLastIndex = 2;
566         final int snapshotIndex = 3;
567         final int snapshotTerm = 1;
568         final int currentTerm = 2;
569
570         actorContext.setConfigParams(new DefaultConfigParamsImpl(){
571             @Override
572             public int getSnapshotChunkSize() {
573                 return 50;
574             }
575         });
576
577         actorContext.setCommitIndex(followersLastIndex);
578
579         leader = new Leader(actorContext);
580
581         Map<String, String> leadersSnapshot = new HashMap<>();
582         leadersSnapshot.put("1", "A");
583         leadersSnapshot.put("2", "B");
584         leadersSnapshot.put("3", "C");
585
586         // set the snapshot variables in replicatedlog
587         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
588         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
589         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
590
591         ByteString bs = toByteString(leadersSnapshot);
592         leader.setSnapshot(Optional.of(bs));
593
594         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
595
596         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
597                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
598
599         assertEquals(1, installSnapshot.getChunkIndex());
600         assertEquals(3, installSnapshot.getTotalChunks());
601
602         followerActor.underlyingActor().clear();
603
604         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
605                 FOLLOWER_ID, -1, false));
606
607         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
608                 TimeUnit.MILLISECONDS);
609
610         leader.handleMessage(leaderActor, new SendHeartBeat());
611
612         installSnapshot = MessageCollectorActor.expectFirstMatching(
613                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
614
615         assertEquals(1, installSnapshot.getChunkIndex());
616         assertEquals(3, installSnapshot.getTotalChunks());
617     }
618
619     @Test
620     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
621         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
622
623         MockRaftActorContext actorContext = createActorContextWithFollower();
624
625         final int followersLastIndex = 2;
626         final int snapshotIndex = 3;
627         final int snapshotTerm = 1;
628         final int currentTerm = 2;
629
630         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
631             @Override
632             public int getSnapshotChunkSize() {
633                 return 50;
634             }
635         });
636
637         actorContext.setCommitIndex(followersLastIndex);
638
639         leader = new Leader(actorContext);
640
641         Map<String, String> leadersSnapshot = new HashMap<>();
642         leadersSnapshot.put("1", "A");
643         leadersSnapshot.put("2", "B");
644         leadersSnapshot.put("3", "C");
645
646         // set the snapshot variables in replicatedlog
647         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
648         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
649         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
650
651         ByteString bs = toByteString(leadersSnapshot);
652         leader.setSnapshot(Optional.of(bs));
653
654         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
655
656         InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
657                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
658
659         assertEquals(1, installSnapshot.getChunkIndex());
660         assertEquals(3, installSnapshot.getTotalChunks());
661         assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
662
663         int hashCode = installSnapshot.getData().hashCode();
664
665         followerActor.underlyingActor().clear();
666
667         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
668                 FOLLOWER_ID, 1, true));
669
670         installSnapshot = MessageCollectorActor.expectFirstMatching(
671                 followerActor, InstallSnapshotMessages.InstallSnapshot.class);
672
673         assertEquals(2, installSnapshot.getChunkIndex());
674         assertEquals(3, installSnapshot.getTotalChunks());
675         assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
676     }
677
678     @Test
679     public void testFollowerToSnapshotLogic() {
680         logStart("testFollowerToSnapshotLogic");
681
682         MockRaftActorContext actorContext = createActorContext();
683
684         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
685             @Override
686             public int getSnapshotChunkSize() {
687                 return 50;
688             }
689         });
690
691         leader = new Leader(actorContext);
692
693         Map<String, String> leadersSnapshot = new HashMap<>();
694         leadersSnapshot.put("1", "A");
695         leadersSnapshot.put("2", "B");
696         leadersSnapshot.put("3", "C");
697
698         ByteString bs = toByteString(leadersSnapshot);
699         byte[] barray = bs.toByteArray();
700
701         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
702         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
703
704         assertEquals(bs.size(), barray.length);
705
706         int chunkIndex=0;
707         for (int i=0; i < barray.length; i = i + 50) {
708             int j = i + 50;
709             chunkIndex++;
710
711             if (i + 50 > barray.length) {
712                 j = barray.length;
713             }
714
715             ByteString chunk = fts.getNextChunk();
716             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
717             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
718
719             fts.markSendStatus(true);
720             if (!fts.isLastChunk(chunkIndex)) {
721                 fts.incrementChunkIndex();
722             }
723         }
724
725         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
726     }
727
728     @Override protected RaftActorBehavior createBehavior(
729         RaftActorContext actorContext) {
730         return new Leader(actorContext);
731     }
732
733     @Override
734     protected MockRaftActorContext createActorContext() {
735         return createActorContext(leaderActor);
736     }
737
738     @Override
739     protected MockRaftActorContext createActorContext(ActorRef actorRef) {
740         return createActorContext("leader", actorRef);
741     }
742
743     private MockRaftActorContext createActorContextWithFollower() {
744         MockRaftActorContext actorContext = createActorContext();
745         actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
746                 followerActor.path().toString()).build());
747         return actorContext;
748     }
749
750     private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
751         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
752         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
753         configParams.setElectionTimeoutFactor(100000);
754         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
755         context.setConfigParams(configParams);
756         return context;
757     }
758
759     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
760         AbstractRaftActorBehavior behavior;
761
762         @Override public void onReceive(Object message) throws Exception {
763             if(behavior != null) {
764                 behavior.handleMessage(sender(), message);
765             }
766
767             super.onReceive(message);
768         }
769
770         public static Props props() {
771             return Props.create(ForwardMessageToBehaviorActor.class);
772         }
773     }
774
775     @Test
776     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
777         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
778
779         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
780
781         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
782
783         Follower follower = new Follower(followerActorContext);
784         followerActor.underlyingActor().behavior = follower;
785
786         Map<String, String> peerAddresses = new HashMap<>();
787         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
788
789         leaderActorContext.setPeerAddresses(peerAddresses);
790
791         leaderActorContext.getReplicatedLog().removeFrom(0);
792
793         //create 3 entries
794         leaderActorContext.setReplicatedLog(
795                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
796
797         leaderActorContext.setCommitIndex(1);
798
799         followerActorContext.getReplicatedLog().removeFrom(0);
800
801         // follower too has the exact same log entries and has the same commit index
802         followerActorContext.setReplicatedLog(
803                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
804
805         followerActorContext.setCommitIndex(1);
806
807         leader = new Leader(leaderActorContext);
808
809         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
810
811         assertEquals(1, appendEntries.getLeaderCommit());
812         assertEquals(0, appendEntries.getEntries().size());
813         assertEquals(0, appendEntries.getPrevLogIndex());
814
815         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
816                 leaderActor, AppendEntriesReply.class);
817
818         assertEquals(2, appendEntriesReply.getLogLastIndex());
819         assertEquals(1, appendEntriesReply.getLogLastTerm());
820
821         // follower returns its next index
822         assertEquals(2, appendEntriesReply.getLogLastIndex());
823         assertEquals(1, appendEntriesReply.getLogLastTerm());
824
825         follower.close();
826     }
827
828     @Test
829     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
830         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
831
832         MockRaftActorContext leaderActorContext = createActorContext();
833
834         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
835
836         Follower follower = new Follower(followerActorContext);
837         followerActor.underlyingActor().behavior = follower;
838
839         Map<String, String> peerAddresses = new HashMap<>();
840         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
841
842         leaderActorContext.setPeerAddresses(peerAddresses);
843
844         leaderActorContext.getReplicatedLog().removeFrom(0);
845
846         leaderActorContext.setReplicatedLog(
847                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
848
849         leaderActorContext.setCommitIndex(1);
850
851         followerActorContext.getReplicatedLog().removeFrom(0);
852
853         followerActorContext.setReplicatedLog(
854                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
855
856         // follower has the same log entries but its commit index > leaders commit index
857         followerActorContext.setCommitIndex(2);
858
859         leader = new Leader(leaderActorContext);
860
861         // Initial heartbeat
862         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
863
864         assertEquals(1, appendEntries.getLeaderCommit());
865         assertEquals(0, appendEntries.getEntries().size());
866         assertEquals(0, appendEntries.getPrevLogIndex());
867
868         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
869                 leaderActor, AppendEntriesReply.class);
870
871         assertEquals(2, appendEntriesReply.getLogLastIndex());
872         assertEquals(1, appendEntriesReply.getLogLastTerm());
873
874         leaderActor.underlyingActor().behavior = leader;
875         leader.handleMessage(followerActor, appendEntriesReply);
876
877         leaderActor.underlyingActor().clear();
878         followerActor.underlyingActor().clear();
879
880         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
881                 TimeUnit.MILLISECONDS);
882
883         leader.handleMessage(leaderActor, new SendHeartBeat());
884
885         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
886
887         assertEquals(2, appendEntries.getLeaderCommit());
888         assertEquals(0, appendEntries.getEntries().size());
889         assertEquals(2, appendEntries.getPrevLogIndex());
890
891         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
892
893         assertEquals(2, appendEntriesReply.getLogLastIndex());
894         assertEquals(1, appendEntriesReply.getLogLastTerm());
895
896         assertEquals(2, followerActorContext.getCommitIndex());
897
898         follower.close();
899     }
900
901     @Test
902     public void testHandleAppendEntriesReplyFailure(){
903         logStart("testHandleAppendEntriesReplyFailure");
904
905         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
906
907         leader = new Leader(leaderActorContext);
908
909         // Send initial heartbeat reply with last index.
910         leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
911
912         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
913         assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
914
915         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
916
917         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
918
919         assertEquals(RaftState.Leader, raftActorBehavior.state());
920
921         assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
922     }
923
924     @Test
925     public void testHandleAppendEntriesReplySuccess() throws Exception {
926         logStart("testHandleAppendEntriesReplySuccess");
927
928         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
929
930         leaderActorContext.setReplicatedLog(
931                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
932
933         leaderActorContext.setCommitIndex(1);
934         leaderActorContext.setLastApplied(1);
935         leaderActorContext.getTermInformation().update(1, "leader");
936
937         leader = new Leader(leaderActorContext);
938
939         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
940
941         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
942
943         assertEquals(RaftState.Leader, raftActorBehavior.state());
944
945         assertEquals(2, leaderActorContext.getCommitIndex());
946
947         ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
948                 leaderActor, ApplyLogEntries.class);
949
950         assertEquals(2, leaderActorContext.getLastApplied());
951
952         assertEquals(2, applyLogEntries.getToIndex());
953
954         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
955                 ApplyState.class);
956
957         assertEquals(1,applyStateList.size());
958
959         ApplyState applyState = applyStateList.get(0);
960
961         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
962     }
963
964     @Test
965     public void testHandleAppendEntriesReplyUnknownFollower(){
966         logStart("testHandleAppendEntriesReplyUnknownFollower");
967
968         MockRaftActorContext leaderActorContext = createActorContext();
969
970         leader = new Leader(leaderActorContext);
971
972         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
973
974         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
975
976         assertEquals(RaftState.Leader, raftActorBehavior.state());
977     }
978
979     @Test
980     public void testHandleRequestVoteReply(){
981         logStart("testHandleRequestVoteReply");
982
983         MockRaftActorContext leaderActorContext = createActorContext();
984
985         leader = new Leader(leaderActorContext);
986
987         // Should be a no-op.
988         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
989                 new RequestVoteReply(1, true));
990
991         assertEquals(RaftState.Leader, raftActorBehavior.state());
992
993         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
994
995         assertEquals(RaftState.Leader, raftActorBehavior.state());
996     }
997
998     @Test
999     public void testIsolatedLeaderCheckNoFollowers() {
1000         logStart("testIsolatedLeaderCheckNoFollowers");
1001
1002         MockRaftActorContext leaderActorContext = createActorContext();
1003
1004         leader = new Leader(leaderActorContext);
1005         RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1006         Assert.assertTrue(behavior instanceof Leader);
1007     }
1008
1009     @Test
1010     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
1011         logStart("testIsolatedLeaderCheckTwoFollowers");
1012
1013         new JavaTestKit(getSystem()) {{
1014
1015             ActorRef followerActor1 = getTestActor();
1016             ActorRef followerActor2 = getTestActor();
1017
1018             MockRaftActorContext leaderActorContext = createActorContext();
1019
1020             Map<String, String> peerAddresses = new HashMap<>();
1021             peerAddresses.put("follower-1", followerActor1.path().toString());
1022             peerAddresses.put("follower-2", followerActor2.path().toString());
1023
1024             leaderActorContext.setPeerAddresses(peerAddresses);
1025
1026             leader = new Leader(leaderActorContext);
1027
1028             leader.markFollowerActive("follower-1");
1029             leader.markFollowerActive("follower-2");
1030             RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1031             Assert.assertTrue("Behavior not instance of Leader when all followers are active",
1032                 behavior instanceof Leader);
1033
1034             // kill 1 follower and verify if that got killed
1035             final JavaTestKit probe = new JavaTestKit(getSystem());
1036             probe.watch(followerActor1);
1037             followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1038             final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1039             assertEquals(termMsg1.getActor(), followerActor1);
1040
1041             leader.markFollowerInActive("follower-1");
1042             leader.markFollowerActive("follower-2");
1043             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1044             Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
1045                 behavior instanceof Leader);
1046
1047             // kill 2nd follower and leader should change to Isolated leader
1048             followerActor2.tell(PoisonPill.getInstance(), null);
1049             probe.watch(followerActor2);
1050             followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1051             final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1052             assertEquals(termMsg2.getActor(), followerActor2);
1053
1054             leader.markFollowerInActive("follower-2");
1055             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
1056             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1057                 behavior instanceof IsolatedLeader);
1058         }};
1059     }
1060
1061
1062     @Test
1063     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
1064         logStart("testAppendEntryCallAtEndofAppendEntryReply");
1065
1066         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1067
1068         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1069         //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1070         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1071
1072         leaderActorContext.setConfigParams(configParams);
1073
1074         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1075
1076         followerActorContext.setConfigParams(configParams);
1077
1078         Follower follower = new Follower(followerActorContext);
1079         followerActor.underlyingActor().behavior = follower;
1080
1081         leaderActorContext.getReplicatedLog().removeFrom(0);
1082         leaderActorContext.setCommitIndex(-1);
1083         leaderActorContext.setLastApplied(-1);
1084
1085         followerActorContext.getReplicatedLog().removeFrom(0);
1086         followerActorContext.setCommitIndex(-1);
1087         followerActorContext.setLastApplied(-1);
1088
1089         leader = new Leader(leaderActorContext);
1090
1091         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1092                 leaderActor, AppendEntriesReply.class);
1093
1094         leader.handleMessage(followerActor, appendEntriesReply);
1095
1096         // Clear initial heartbeat messages
1097
1098         leaderActor.underlyingActor().clear();
1099         followerActor.underlyingActor().clear();
1100
1101         // create 3 entries
1102         leaderActorContext.setReplicatedLog(
1103                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1104         leaderActorContext.setCommitIndex(1);
1105         leaderActorContext.setLastApplied(1);
1106
1107         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1108                 TimeUnit.MILLISECONDS);
1109
1110         leader.handleMessage(leaderActor, new SendHeartBeat());
1111
1112         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1113
1114         // Should send first log entry
1115         assertEquals(1, appendEntries.getLeaderCommit());
1116         assertEquals(0, appendEntries.getEntries().get(0).getIndex());
1117         assertEquals(-1, appendEntries.getPrevLogIndex());
1118
1119         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1120
1121         assertEquals(1, appendEntriesReply.getLogLastTerm());
1122         assertEquals(0, appendEntriesReply.getLogLastIndex());
1123
1124         followerActor.underlyingActor().clear();
1125
1126         leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
1127
1128         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1129
1130         // Should send second log entry
1131         assertEquals(1, appendEntries.getLeaderCommit());
1132         assertEquals(1, appendEntries.getEntries().get(0).getIndex());
1133
1134         follower.close();
1135     }
1136
1137     @Override
1138     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
1139             ActorRef actorRef, RaftRPC rpc) throws Exception {
1140         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1141         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
1142     }
1143
1144     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
1145
1146         private final long electionTimeOutIntervalMillis;
1147         private final int snapshotChunkSize;
1148
1149         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
1150             super();
1151             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
1152             this.snapshotChunkSize = snapshotChunkSize;
1153         }
1154
1155         @Override
1156         public FiniteDuration getElectionTimeOutInterval() {
1157             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
1158         }
1159
1160         @Override
1161         public int getSnapshotChunkSize() {
1162             return snapshotChunkSize;
1163         }
1164     }
1165 }