Use Files.delete() in LocalSnapshotStore
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.apache.commons.lang3.SerializationUtils;
43 import org.junit.After;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.messaging.MessageSlice;
46 import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
47 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
48 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorContext;
51 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
60 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
61 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
67 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
69 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
70 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
75 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import org.opendaylight.yangtools.concepts.Identifier;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class LeaderTest extends AbstractLeaderTest<Leader> {
83
84     static final String FOLLOWER_ID = "follower";
85     public static final String LEADER_ID = "leader";
86
87     private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
88             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
89
90     private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
91             Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
92
93     private Leader leader;
94     private final short payloadVersion = 5;
95
96     @Override
97     @After
98     public void tearDown() {
99         if (leader != null) {
100             leader.close();
101         }
102
103         super.tearDown();
104     }
105
106     @Test
107     public void testHandleMessageForUnknownMessage() {
108         logStart("testHandleMessageForUnknownMessage");
109
110         leader = new Leader(createActorContext());
111
112         // handle message should null when it receives an unknown message
113         assertNull(leader.handleMessage(followerActor, "foo"));
114     }
115
116     @Test
117     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
118         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
119
120         MockRaftActorContext actorContext = createActorContextWithFollower();
121         actorContext.setCommitIndex(-1);
122         actorContext.setPayloadVersion(payloadVersion);
123
124         long term = 1;
125         actorContext.getTermInformation().update(term, "");
126
127         leader = new Leader(actorContext);
128         actorContext.setCurrentBehavior(leader);
129
130         // Leader should send an immediate heartbeat with no entries as follower is inactive.
131         final long lastIndex = actorContext.getReplicatedLog().lastIndex();
132         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
133         assertEquals("getTerm", term, appendEntries.getTerm());
134         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
135         assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
136         assertEquals("Entries size", 0, appendEntries.getEntries().size());
137         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
138
139         // The follower would normally reply - simulate that explicitly here.
140         leader.handleMessage(followerActor, new AppendEntriesReply(
141                 FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
142         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
143
144         followerActor.underlyingActor().clear();
145
146         // Sleep for the heartbeat interval so AppendEntries is sent.
147         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams()
148                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
149
150         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
151
152         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
153         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
154         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
155         assertEquals("Entries size", 1, appendEntries.getEntries().size());
156         assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
157         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
158         assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
159     }
160
161
162     private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
163         return sendReplicate(actorContext, 1, index);
164     }
165
166     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
167         return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
168     }
169
170     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
171         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
172         actorContext.getReplicatedLog().append(newEntry);
173         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
174     }
175
176     @Test
177     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
178         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
179
180         MockRaftActorContext actorContext = createActorContextWithFollower();
181
182         long term = 1;
183         actorContext.getTermInformation().update(term, "");
184
185         leader = new Leader(actorContext);
186
187         // Leader will send an immediate heartbeat - ignore it.
188         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
189
190         // The follower would normally reply - simulate that explicitly here.
191         long lastIndex = actorContext.getReplicatedLog().lastIndex();
192         leader.handleMessage(followerActor, new AppendEntriesReply(
193                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
194         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
195
196         followerActor.underlyingActor().clear();
197
198         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
199
200         // State should not change
201         assertTrue(raftBehavior instanceof Leader);
202
203         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
204         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
205         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
206         assertEquals("Entries size", 1, appendEntries.getEntries().size());
207         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
208         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
209         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
210         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
211     }
212
213     @Test
214     public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
215         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
216
217         MockRaftActorContext actorContext = createActorContextWithFollower();
218         actorContext.setCommitIndex(-1);
219         actorContext.setLastApplied(-1);
220
221         // The raft context is initialized with a couple log entries. However the commitIndex
222         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
223         // committed and applied. Now it regains leadership with a higher term (2).
224         long prevTerm = actorContext.getTermInformation().getCurrentTerm();
225         long newTerm = prevTerm + 1;
226         actorContext.getTermInformation().update(newTerm, "");
227
228         leader = new Leader(actorContext);
229         actorContext.setCurrentBehavior(leader);
230
231         // Leader will send an immediate heartbeat - ignore it.
232         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
233
234         // The follower replies with the leader's current last index and term, simulating that it is
235         // up to date with the leader.
236         long lastIndex = actorContext.getReplicatedLog().lastIndex();
237         leader.handleMessage(followerActor, new AppendEntriesReply(
238                 FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
239
240         // The commit index should not get updated even though consensus was reached. This is b/c the
241         // last entry's term does match the current term. As per Â§5.4.1, "Raft never commits log entries
242         // from previous terms by counting replicas".
243         assertEquals("Commit Index", -1, actorContext.getCommitIndex());
244
245         followerActor.underlyingActor().clear();
246
247         // Now replicate a new entry with the new term 2.
248         long newIndex = lastIndex + 1;
249         sendReplicate(actorContext, newTerm, newIndex);
250
251         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
252         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
253         assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
254         assertEquals("Entries size", 1, appendEntries.getEntries().size());
255         assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
256         assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
257         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
258
259         // The follower replies with success. The leader should now update the commit index to the new index
260         // as per Â§5.4.1 "once an entry from the current term is committed by counting replicas, then all
261         // prior entries are committed indirectly".
262         leader.handleMessage(followerActor, new AppendEntriesReply(
263                 FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
264
265         assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
266     }
267
268     @Test
269     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
270         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
271
272         MockRaftActorContext actorContext = createActorContextWithFollower();
273         actorContext.setRaftPolicy(createRaftPolicy(true, true));
274
275         long term = 1;
276         actorContext.getTermInformation().update(term, "");
277
278         leader = new Leader(actorContext);
279
280         // Leader will send an immediate heartbeat - ignore it.
281         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
282
283         // The follower would normally reply - simulate that explicitly here.
284         long lastIndex = actorContext.getReplicatedLog().lastIndex();
285         leader.handleMessage(followerActor, new AppendEntriesReply(
286                 FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
287         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
288
289         followerActor.underlyingActor().clear();
290
291         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
292
293         // State should not change
294         assertTrue(raftBehavior instanceof Leader);
295
296         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
297         assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
298         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
299         assertEquals("Entries size", 1, appendEntries.getEntries().size());
300         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
301         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
302         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
303         assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
304     }
305
306     @Test
307     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
308         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
309
310         MockRaftActorContext actorContext = createActorContextWithFollower();
311         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
312             @Override
313             public FiniteDuration getHeartBeatInterval() {
314                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
315             }
316         });
317
318         long term = 1;
319         actorContext.getTermInformation().update(term, "");
320
321         leader = new Leader(actorContext);
322
323         // Leader will send an immediate heartbeat - ignore it.
324         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
325
326         // The follower would normally reply - simulate that explicitly here.
327         long lastIndex = actorContext.getReplicatedLog().lastIndex();
328         leader.handleMessage(followerActor, new AppendEntriesReply(
329                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
330         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
331
332         followerActor.underlyingActor().clear();
333
334         for (int i = 0; i < 5; i++) {
335             sendReplicate(actorContext, lastIndex + i + 1);
336         }
337
338         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
339         // We expect only 1 message to be sent because of two reasons,
340         // - an append entries reply was not received
341         // - the heartbeat interval has not expired
342         // In this scenario if multiple messages are sent they would likely be duplicates
343         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
344     }
345
346     @Test
347     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
348         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
349
350         MockRaftActorContext actorContext = createActorContextWithFollower();
351         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
352             @Override
353             public FiniteDuration getHeartBeatInterval() {
354                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
355             }
356         });
357
358         long term = 1;
359         actorContext.getTermInformation().update(term, "");
360
361         leader = new Leader(actorContext);
362
363         // Leader will send an immediate heartbeat - ignore it.
364         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
365
366         // The follower would normally reply - simulate that explicitly here.
367         long lastIndex = actorContext.getReplicatedLog().lastIndex();
368         leader.handleMessage(followerActor, new AppendEntriesReply(
369                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
370         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
371
372         followerActor.underlyingActor().clear();
373
374         for (int i = 0; i < 3; i++) {
375             sendReplicate(actorContext, lastIndex + i + 1);
376             leader.handleMessage(followerActor, new AppendEntriesReply(
377                     FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
378
379         }
380
381         for (int i = 3; i < 5; i++) {
382             sendReplicate(actorContext, lastIndex + i + 1);
383         }
384
385         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
386         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
387         // get sent to the follower - but not the 5th
388         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
389
390         for (int i = 0; i < 4; i++) {
391             long expected = allMessages.get(i).getEntries().get(0).getIndex();
392             assertEquals(expected, i + 2);
393         }
394     }
395
396     @Test
397     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
398         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
399
400         MockRaftActorContext actorContext = createActorContextWithFollower();
401         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
402             @Override
403             public FiniteDuration getHeartBeatInterval() {
404                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
405             }
406         });
407
408         long term = 1;
409         actorContext.getTermInformation().update(term, "");
410
411         leader = new Leader(actorContext);
412
413         // Leader will send an immediate heartbeat - ignore it.
414         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
415
416         // The follower would normally reply - simulate that explicitly here.
417         long lastIndex = actorContext.getReplicatedLog().lastIndex();
418         leader.handleMessage(followerActor, new AppendEntriesReply(
419                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
420         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
421
422         followerActor.underlyingActor().clear();
423
424         sendReplicate(actorContext, lastIndex + 1);
425
426         // Wait slightly longer than heartbeat duration
427         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
428
429         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
430
431         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
432         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
433
434         assertEquals(1, allMessages.get(0).getEntries().size());
435         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
436         assertEquals(1, allMessages.get(1).getEntries().size());
437         assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
438
439     }
440
441     @Test
442     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
443         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
444
445         MockRaftActorContext actorContext = createActorContextWithFollower();
446         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
447             @Override
448             public FiniteDuration getHeartBeatInterval() {
449                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
450             }
451         });
452
453         long term = 1;
454         actorContext.getTermInformation().update(term, "");
455
456         leader = new Leader(actorContext);
457
458         // Leader will send an immediate heartbeat - ignore it.
459         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
460
461         // The follower would normally reply - simulate that explicitly here.
462         long lastIndex = actorContext.getReplicatedLog().lastIndex();
463         leader.handleMessage(followerActor, new AppendEntriesReply(
464                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
465         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
466
467         followerActor.underlyingActor().clear();
468
469         for (int i = 0; i < 3; i++) {
470             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
471             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
472         }
473
474         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
475         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
476     }
477
478     @Test
479     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
480         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
481
482         MockRaftActorContext actorContext = createActorContextWithFollower();
483         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
484             @Override
485             public FiniteDuration getHeartBeatInterval() {
486                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
487             }
488         });
489
490         long term = 1;
491         actorContext.getTermInformation().update(term, "");
492
493         leader = new Leader(actorContext);
494
495         // Leader will send an immediate heartbeat - ignore it.
496         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
497
498         // The follower would normally reply - simulate that explicitly here.
499         long lastIndex = actorContext.getReplicatedLog().lastIndex();
500         leader.handleMessage(followerActor, new AppendEntriesReply(
501                 FOLLOWER_ID, term, true, lastIndex, term, (short)0));
502         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
503
504         followerActor.underlyingActor().clear();
505
506         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
507         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
508         sendReplicate(actorContext, lastIndex + 1);
509
510         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
511         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
512
513         assertEquals(0, allMessages.get(0).getEntries().size());
514         assertEquals(1, allMessages.get(1).getEntries().size());
515     }
516
517
518     @Test
519     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
520         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
521
522         MockRaftActorContext actorContext = createActorContext();
523
524         leader = new Leader(actorContext);
525
526         actorContext.setLastApplied(0);
527
528         long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
529         long term = actorContext.getTermInformation().getCurrentTerm();
530         ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
531                 newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
532
533         actorContext.getReplicatedLog().append(newEntry);
534
535         final Identifier id = new MockIdentifier("state-id");
536         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
537                 new Replicate(leaderActor, id, newEntry, true));
538
539         // State should not change
540         assertTrue(raftBehavior instanceof Leader);
541
542         assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
543
544         // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
545         // one since lastApplied state is 0.
546         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
547                 leaderActor, ApplyState.class);
548         assertEquals("ApplyState count", newLogIndex, applyStateList.size());
549
550         for (int i = 0; i <= newLogIndex - 1; i++) {
551             ApplyState applyState = applyStateList.get(i);
552             assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
553             assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
554         }
555
556         ApplyState last = applyStateList.get((int) newLogIndex - 1);
557         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
558         assertEquals("getIdentifier", id, last.getIdentifier());
559     }
560
561     @Test
562     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
563         logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
564
565         final MockRaftActorContext actorContext = createActorContextWithFollower();
566
567         Map<String, String> leadersSnapshot = new HashMap<>();
568         leadersSnapshot.put("1", "A");
569         leadersSnapshot.put("2", "B");
570         leadersSnapshot.put("3", "C");
571
572         //clears leaders log
573         actorContext.getReplicatedLog().removeFrom(0);
574
575         final int commitIndex = 3;
576         final int snapshotIndex = 2;
577         final int snapshotTerm = 1;
578
579         // set the snapshot variables in replicatedlog
580         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
581         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
582         actorContext.setCommitIndex(commitIndex);
583         //set follower timeout to 2 mins, helps during debugging
584         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
585
586         leader = new Leader(actorContext);
587
588         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
589         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
590
591         //update follower timestamp
592         leader.markFollowerActive(FOLLOWER_ID);
593
594         ByteString bs = toByteString(leadersSnapshot);
595         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
596                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
597                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
598         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
599                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
600         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
601         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
602
603         //send first chunk and no InstallSnapshotReply received yet
604         fts.getNextChunk();
605         fts.incrementChunkIndex();
606
607         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
608                 TimeUnit.MILLISECONDS);
609
610         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
611
612         AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
613
614         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
615
616         //InstallSnapshotReply received
617         fts.markSendStatus(true);
618
619         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
620
621         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
622
623         assertEquals(commitIndex, is.getLastIncludedIndex());
624     }
625
626     @Test
627     public void testSendAppendEntriesSnapshotScenario() {
628         logStart("testSendAppendEntriesSnapshotScenario");
629
630         final MockRaftActorContext actorContext = createActorContextWithFollower();
631
632         Map<String, String> leadersSnapshot = new HashMap<>();
633         leadersSnapshot.put("1", "A");
634         leadersSnapshot.put("2", "B");
635         leadersSnapshot.put("3", "C");
636
637         //clears leaders log
638         actorContext.getReplicatedLog().removeFrom(0);
639
640         final int followersLastIndex = 2;
641         final int snapshotIndex = 3;
642         final int newEntryIndex = 4;
643         final int snapshotTerm = 1;
644         final int currentTerm = 2;
645
646         // set the snapshot variables in replicatedlog
647         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
648         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
649         actorContext.setCommitIndex(followersLastIndex);
650
651         leader = new Leader(actorContext);
652
653         // Leader will send an immediate heartbeat - ignore it.
654         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
655
656         // new entry
657         SimpleReplicatedLogEntry entry =
658                 new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
659                         new MockRaftActorContext.MockPayload("D"));
660
661         actorContext.getReplicatedLog().append(entry);
662
663         //update follower timestamp
664         leader.markFollowerActive(FOLLOWER_ID);
665
666         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
667         RaftActorBehavior raftBehavior = leader.handleMessage(
668                 leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
669
670         assertTrue(raftBehavior instanceof Leader);
671
672         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
673     }
674
675     @Test
676     public void testInitiateInstallSnapshot() {
677         logStart("testInitiateInstallSnapshot");
678
679         MockRaftActorContext actorContext = createActorContextWithFollower();
680
681         //clears leaders log
682         actorContext.getReplicatedLog().removeFrom(0);
683
684         final int followersLastIndex = 2;
685         final int snapshotIndex = 3;
686         final int newEntryIndex = 4;
687         final int snapshotTerm = 1;
688         final int currentTerm = 2;
689
690         // set the snapshot variables in replicatedlog
691         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
692         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
693         actorContext.setLastApplied(3);
694         actorContext.setCommitIndex(followersLastIndex);
695
696         leader = new Leader(actorContext);
697
698         // Leader will send an immediate heartbeat - ignore it.
699         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
700
701         // set the snapshot as absent and check if capture-snapshot is invoked.
702         leader.setSnapshotHolder(null);
703
704         // new entry
705         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
706                 new MockRaftActorContext.MockPayload("D"));
707
708         actorContext.getReplicatedLog().append(entry);
709
710         //update follower timestamp
711         leader.markFollowerActive(FOLLOWER_ID);
712
713         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
714
715         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
716
717         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
718
719         assertEquals(3, cs.getLastAppliedIndex());
720         assertEquals(1, cs.getLastAppliedTerm());
721         assertEquals(4, cs.getLastIndex());
722         assertEquals(2, cs.getLastTerm());
723
724         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
725         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
726
727         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
728     }
729
730     @Test
731     public void testInitiateForceInstallSnapshot() throws Exception {
732         logStart("testInitiateForceInstallSnapshot");
733
734         MockRaftActorContext actorContext = createActorContextWithFollower();
735
736         final int followersLastIndex = 2;
737         final int snapshotIndex = -1;
738         final int newEntryIndex = 4;
739         final int snapshotTerm = -1;
740         final int currentTerm = 2;
741
742         // set the snapshot variables in replicatedlog
743         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
744         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
745         actorContext.setLastApplied(3);
746         actorContext.setCommitIndex(followersLastIndex);
747
748         actorContext.getReplicatedLog().removeFrom(0);
749
750         AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
751         actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
752
753         leader = new Leader(actorContext);
754         actorContext.setCurrentBehavior(leader);
755
756         // Leader will send an immediate heartbeat - ignore it.
757         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
758
759         // set the snapshot as absent and check if capture-snapshot is invoked.
760         leader.setSnapshotHolder(null);
761
762         for (int i = 0; i < 4; i++) {
763             actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
764                     new MockRaftActorContext.MockPayload("X" + i)));
765         }
766
767         // new entry
768         SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
769                 new MockRaftActorContext.MockPayload("D"));
770
771         actorContext.getReplicatedLog().append(entry);
772
773         //update follower timestamp
774         leader.markFollowerActive(FOLLOWER_ID);
775
776         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
777         // installed with a SendInstallSnapshot
778         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
779                 RaftVersions.CURRENT_VERSION));
780
781         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
782
783         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
784         assertEquals(3, cs.getLastAppliedIndex());
785         assertEquals(1, cs.getLastAppliedTerm());
786         assertEquals(4, cs.getLastIndex());
787         assertEquals(2, cs.getLastTerm());
788
789         assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
790         assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
791
792         MessageCollectorActor.clearMessages(followerActor);
793
794         // Sending Replicate message should not initiate another capture since the first is in progress.
795         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
796         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
797
798         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
799         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
800                 RaftVersions.CURRENT_VERSION));
801         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
802
803         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
804         final byte[] bytes = new byte[]{1, 2, 3};
805         installSnapshotStream.get().get().write(bytes);
806         actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
807                 Runtime.getRuntime().totalMemory());
808         MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
809
810         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
811         MessageCollectorActor.clearMessages(followerActor);
812         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
813                 RaftVersions.CURRENT_VERSION));
814         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
815     }
816
817
818     @Test
819     public void testInstallSnapshot() {
820         logStart("testInstallSnapshot");
821
822         final MockRaftActorContext actorContext = createActorContextWithFollower();
823
824         Map<String, String> leadersSnapshot = new HashMap<>();
825         leadersSnapshot.put("1", "A");
826         leadersSnapshot.put("2", "B");
827         leadersSnapshot.put("3", "C");
828
829         //clears leaders log
830         actorContext.getReplicatedLog().removeFrom(0);
831
832         final int lastAppliedIndex = 3;
833         final int snapshotIndex = 2;
834         final int snapshotTerm = 1;
835         final int currentTerm = 2;
836
837         // set the snapshot variables in replicatedlog
838         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
839         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
840         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
841         actorContext.setCommitIndex(lastAppliedIndex);
842         actorContext.setLastApplied(lastAppliedIndex);
843
844         leader = new Leader(actorContext);
845
846         // Initial heartbeat.
847         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
848
849         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
850         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
851
852         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
853         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
854                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
855
856         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
857                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
858
859         assertTrue(raftBehavior instanceof Leader);
860
861         // check if installsnapshot gets called with the correct values.
862
863         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
864                 InstallSnapshot.class);
865
866         assertNotNull(installSnapshot.getData());
867         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
868         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
869
870         assertEquals(currentTerm, installSnapshot.getTerm());
871     }
872
873     @Test
874     public void testForceInstallSnapshot() {
875         logStart("testForceInstallSnapshot");
876
877         final MockRaftActorContext actorContext = createActorContextWithFollower();
878
879         Map<String, String> leadersSnapshot = new HashMap<>();
880         leadersSnapshot.put("1", "A");
881         leadersSnapshot.put("2", "B");
882         leadersSnapshot.put("3", "C");
883
884         final int lastAppliedIndex = 3;
885         final int snapshotIndex = -1;
886         final int snapshotTerm = -1;
887         final int currentTerm = 2;
888
889         // set the snapshot variables in replicatedlog
890         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
891         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
892         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
893         actorContext.setCommitIndex(lastAppliedIndex);
894         actorContext.setLastApplied(lastAppliedIndex);
895
896         leader = new Leader(actorContext);
897
898         // Initial heartbeat.
899         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
900
901         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
902         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
903
904         byte[] bytes = toByteString(leadersSnapshot).toByteArray();
905         Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
906                 lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
907
908         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
909                 new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
910
911         assertTrue(raftBehavior instanceof Leader);
912
913         // check if installsnapshot gets called with the correct values.
914
915         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
916                 InstallSnapshot.class);
917
918         assertNotNull(installSnapshot.getData());
919         assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
920         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
921
922         assertEquals(currentTerm, installSnapshot.getTerm());
923     }
924
925     @Test
926     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
927         logStart("testHandleInstallSnapshotReplyLastChunk");
928
929         MockRaftActorContext actorContext = createActorContextWithFollower();
930
931         final int commitIndex = 3;
932         final int snapshotIndex = 2;
933         final int snapshotTerm = 1;
934         final int currentTerm = 2;
935
936         actorContext.setCommitIndex(commitIndex);
937
938         leader = new Leader(actorContext);
939         actorContext.setCurrentBehavior(leader);
940
941         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
942         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
943
944         // Ignore initial heartbeat.
945         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
946
947         Map<String, String> leadersSnapshot = new HashMap<>();
948         leadersSnapshot.put("1", "A");
949         leadersSnapshot.put("2", "B");
950         leadersSnapshot.put("3", "C");
951
952         // set the snapshot variables in replicatedlog
953
954         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
955         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
956         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
957
958         ByteString bs = toByteString(leadersSnapshot);
959         leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
960                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
961                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
962         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
963                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
964         fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
965         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
966         while (!fts.isLastChunk(fts.getChunkIndex())) {
967             fts.getNextChunk();
968             fts.incrementChunkIndex();
969         }
970
971         //clears leaders log
972         actorContext.getReplicatedLog().removeFrom(0);
973
974         RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
975                 new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
976
977         assertTrue(raftBehavior instanceof Leader);
978
979         assertEquals(1, leader.followerLogSize());
980         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
981         assertNotNull(fli);
982         assertNull(fli.getInstallSnapshotState());
983         assertEquals(commitIndex, fli.getMatchIndex());
984         assertEquals(commitIndex + 1, fli.getNextIndex());
985         assertFalse(leader.hasSnapshot());
986     }
987
988     @Test
989     public void testSendSnapshotfromInstallSnapshotReply() {
990         logStart("testSendSnapshotfromInstallSnapshotReply");
991
992         MockRaftActorContext actorContext = createActorContextWithFollower();
993
994         final int commitIndex = 3;
995         final int snapshotIndex = 2;
996         final int snapshotTerm = 1;
997         final int currentTerm = 2;
998
999         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
1000             @Override
1001             public int getSnapshotChunkSize() {
1002                 return 50;
1003             }
1004         };
1005         configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
1006         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1007
1008         actorContext.setConfigParams(configParams);
1009         actorContext.setCommitIndex(commitIndex);
1010
1011         leader = new Leader(actorContext);
1012         actorContext.setCurrentBehavior(leader);
1013
1014         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1015         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1016
1017         Map<String, String> leadersSnapshot = new HashMap<>();
1018         leadersSnapshot.put("1", "A");
1019         leadersSnapshot.put("2", "B");
1020         leadersSnapshot.put("3", "C");
1021
1022         // set the snapshot variables in replicatedlog
1023         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1024         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1025         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1026
1027         ByteString bs = toByteString(leadersSnapshot);
1028         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1029                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1030                 -1, null, null);
1031
1032         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1033
1034         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1035                 InstallSnapshot.class);
1036
1037         assertEquals(1, installSnapshot.getChunkIndex());
1038         assertEquals(3, installSnapshot.getTotalChunks());
1039
1040         followerActor.underlyingActor().clear();
1041         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1042                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1043
1044         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1045
1046         assertEquals(2, installSnapshot.getChunkIndex());
1047         assertEquals(3, installSnapshot.getTotalChunks());
1048
1049         followerActor.underlyingActor().clear();
1050         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1051                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1052
1053         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1054
1055         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
1056         followerActor.underlyingActor().clear();
1057         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1058                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
1059
1060         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
1061
1062         assertNull(installSnapshot);
1063     }
1064
1065
1066     @Test
1067     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
1068         logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
1069
1070         MockRaftActorContext actorContext = createActorContextWithFollower();
1071
1072         final int commitIndex = 3;
1073         final int snapshotIndex = 2;
1074         final int snapshotTerm = 1;
1075         final int currentTerm = 2;
1076
1077         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1078             @Override
1079             public int getSnapshotChunkSize() {
1080                 return 50;
1081             }
1082         });
1083
1084         actorContext.setCommitIndex(commitIndex);
1085
1086         leader = new Leader(actorContext);
1087
1088         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1089         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1090
1091         Map<String, String> leadersSnapshot = new HashMap<>();
1092         leadersSnapshot.put("1", "A");
1093         leadersSnapshot.put("2", "B");
1094         leadersSnapshot.put("3", "C");
1095
1096         // set the snapshot variables in replicatedlog
1097         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1098         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1099         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1100
1101         ByteString bs = toByteString(leadersSnapshot);
1102         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1103                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1104                 -1, null, null);
1105
1106         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
1107         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1108
1109         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1110                 InstallSnapshot.class);
1111
1112         assertEquals(1, installSnapshot.getChunkIndex());
1113         assertEquals(3, installSnapshot.getTotalChunks());
1114
1115         followerActor.underlyingActor().clear();
1116
1117         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
1118                 FOLLOWER_ID, -1, false));
1119
1120         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1121                 TimeUnit.MILLISECONDS);
1122
1123         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1124
1125         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1126
1127         assertEquals(1, installSnapshot.getChunkIndex());
1128         assertEquals(3, installSnapshot.getTotalChunks());
1129     }
1130
1131     @Test
1132     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
1133         logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
1134
1135         MockRaftActorContext actorContext = createActorContextWithFollower();
1136
1137         final int commitIndex = 3;
1138         final int snapshotIndex = 2;
1139         final int snapshotTerm = 1;
1140         final int currentTerm = 2;
1141
1142         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
1143             @Override
1144             public int getSnapshotChunkSize() {
1145                 return 50;
1146             }
1147         });
1148
1149         actorContext.setCommitIndex(commitIndex);
1150
1151         leader = new Leader(actorContext);
1152
1153         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
1154         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
1155
1156         Map<String, String> leadersSnapshot = new HashMap<>();
1157         leadersSnapshot.put("1", "A");
1158         leadersSnapshot.put("2", "B");
1159         leadersSnapshot.put("3", "C");
1160
1161         // set the snapshot variables in replicatedlog
1162         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
1163         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
1164         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
1165
1166         ByteString bs = toByteString(leadersSnapshot);
1167         Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
1168                 Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
1169                 -1, null, null);
1170
1171         leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
1172
1173         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
1174                 InstallSnapshot.class);
1175
1176         assertEquals(1, installSnapshot.getChunkIndex());
1177         assertEquals(3, installSnapshot.getTotalChunks());
1178         assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
1179                 installSnapshot.getLastChunkHashCode().get().intValue());
1180
1181         final int hashCode = Arrays.hashCode(installSnapshot.getData());
1182
1183         followerActor.underlyingActor().clear();
1184
1185         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
1186                 FOLLOWER_ID, 1, true));
1187
1188         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
1189
1190         assertEquals(2, installSnapshot.getChunkIndex());
1191         assertEquals(3, installSnapshot.getTotalChunks());
1192         assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
1193     }
1194
1195     @Test
1196     public void testLeaderInstallSnapshotState() throws IOException {
1197         logStart("testLeaderInstallSnapshotState");
1198
1199         Map<String, String> leadersSnapshot = new HashMap<>();
1200         leadersSnapshot.put("1", "A");
1201         leadersSnapshot.put("2", "B");
1202         leadersSnapshot.put("3", "C");
1203
1204         ByteString bs = toByteString(leadersSnapshot);
1205         byte[] barray = bs.toByteArray();
1206
1207         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
1208         fts.setSnapshotBytes(ByteSource.wrap(barray));
1209
1210         assertEquals(bs.size(), barray.length);
1211
1212         int chunkIndex = 0;
1213         for (int i = 0; i < barray.length; i = i + 50) {
1214             int length = i + 50;
1215             chunkIndex++;
1216
1217             if (i + 50 > barray.length) {
1218                 length = barray.length;
1219             }
1220
1221             byte[] chunk = fts.getNextChunk();
1222             assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length);
1223             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
1224
1225             fts.markSendStatus(true);
1226             if (!fts.isLastChunk(chunkIndex)) {
1227                 fts.incrementChunkIndex();
1228             }
1229         }
1230
1231         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
1232         fts.close();
1233     }
1234
1235     @Override
1236     protected Leader createBehavior(final RaftActorContext actorContext) {
1237         return new Leader(actorContext);
1238     }
1239
1240     @Override
1241     protected MockRaftActorContext createActorContext() {
1242         return createActorContext(leaderActor);
1243     }
1244
1245     @Override
1246     protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
1247         return createActorContext(LEADER_ID, actorRef);
1248     }
1249
1250     private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
1251         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1252         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
1253         configParams.setElectionTimeoutFactor(100000);
1254         MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
1255         context.setConfigParams(configParams);
1256         context.setPayloadVersion(payloadVersion);
1257         return context;
1258     }
1259
1260     private MockRaftActorContext createActorContextWithFollower() {
1261         MockRaftActorContext actorContext = createActorContext();
1262         actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
1263                 followerActor.path().toString()).build());
1264         return actorContext;
1265     }
1266
1267     private MockRaftActorContext createFollowerActorContextWithLeader() {
1268         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1269         DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
1270         followerConfig.setElectionTimeoutFactor(10000);
1271         followerActorContext.setConfigParams(followerConfig);
1272         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1273         return followerActorContext;
1274     }
1275
1276     @Test
1277     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
1278         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
1279
1280         final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1281
1282         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1283
1284         Follower follower = new Follower(followerActorContext);
1285         followerActor.underlyingActor().setBehavior(follower);
1286         followerActorContext.setCurrentBehavior(follower);
1287
1288         Map<String, String> peerAddresses = new HashMap<>();
1289         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1290
1291         leaderActorContext.setPeerAddresses(peerAddresses);
1292
1293         leaderActorContext.getReplicatedLog().removeFrom(0);
1294
1295         //create 3 entries
1296         leaderActorContext.setReplicatedLog(
1297                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1298
1299         leaderActorContext.setCommitIndex(1);
1300
1301         followerActorContext.getReplicatedLog().removeFrom(0);
1302
1303         // follower too has the exact same log entries and has the same commit index
1304         followerActorContext.setReplicatedLog(
1305                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1306
1307         followerActorContext.setCommitIndex(1);
1308
1309         leader = new Leader(leaderActorContext);
1310         leaderActorContext.setCurrentBehavior(leader);
1311
1312         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1313
1314         assertEquals(-1, appendEntries.getLeaderCommit());
1315         assertEquals(0, appendEntries.getEntries().size());
1316         assertEquals(0, appendEntries.getPrevLogIndex());
1317
1318         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1319                 leaderActor, AppendEntriesReply.class);
1320
1321         assertEquals(2, appendEntriesReply.getLogLastIndex());
1322         assertEquals(1, appendEntriesReply.getLogLastTerm());
1323
1324         // follower returns its next index
1325         assertEquals(2, appendEntriesReply.getLogLastIndex());
1326         assertEquals(1, appendEntriesReply.getLogLastTerm());
1327
1328         follower.close();
1329     }
1330
1331     @Test
1332     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
1333         logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
1334
1335         final MockRaftActorContext leaderActorContext = createActorContext();
1336
1337         MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
1338         followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
1339
1340         Follower follower = new Follower(followerActorContext);
1341         followerActor.underlyingActor().setBehavior(follower);
1342         followerActorContext.setCurrentBehavior(follower);
1343
1344         Map<String, String> leaderPeerAddresses = new HashMap<>();
1345         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
1346
1347         leaderActorContext.setPeerAddresses(leaderPeerAddresses);
1348
1349         leaderActorContext.getReplicatedLog().removeFrom(0);
1350
1351         leaderActorContext.setReplicatedLog(
1352                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1353
1354         leaderActorContext.setCommitIndex(1);
1355
1356         followerActorContext.getReplicatedLog().removeFrom(0);
1357
1358         followerActorContext.setReplicatedLog(
1359                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1360
1361         // follower has the same log entries but its commit index > leaders commit index
1362         followerActorContext.setCommitIndex(2);
1363
1364         leader = new Leader(leaderActorContext);
1365
1366         // Initial heartbeat
1367         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1368
1369         assertEquals(-1, appendEntries.getLeaderCommit());
1370         assertEquals(0, appendEntries.getEntries().size());
1371         assertEquals(0, appendEntries.getPrevLogIndex());
1372
1373         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
1374                 leaderActor, AppendEntriesReply.class);
1375
1376         assertEquals(2, appendEntriesReply.getLogLastIndex());
1377         assertEquals(1, appendEntriesReply.getLogLastTerm());
1378
1379         leaderActor.underlyingActor().setBehavior(follower);
1380         leader.handleMessage(followerActor, appendEntriesReply);
1381
1382         leaderActor.underlyingActor().clear();
1383         followerActor.underlyingActor().clear();
1384
1385         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
1386                 TimeUnit.MILLISECONDS);
1387
1388         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
1389
1390         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1391
1392         assertEquals(2, appendEntries.getLeaderCommit());
1393         assertEquals(0, appendEntries.getEntries().size());
1394         assertEquals(2, appendEntries.getPrevLogIndex());
1395
1396         appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1397
1398         assertEquals(2, appendEntriesReply.getLogLastIndex());
1399         assertEquals(1, appendEntriesReply.getLogLastTerm());
1400
1401         assertEquals(2, followerActorContext.getCommitIndex());
1402
1403         follower.close();
1404     }
1405
1406     @Test
1407     public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() {
1408         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
1409
1410         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1411         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1412                 new FiniteDuration(1000, TimeUnit.SECONDS));
1413
1414         leaderActorContext.setReplicatedLog(
1415                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1416         long leaderCommitIndex = 2;
1417         leaderActorContext.setCommitIndex(leaderCommitIndex);
1418         leaderActorContext.setLastApplied(leaderCommitIndex);
1419
1420         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1421         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1422
1423         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1424
1425         followerActorContext.setReplicatedLog(
1426                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1427         followerActorContext.setCommitIndex(0);
1428         followerActorContext.setLastApplied(0);
1429
1430         Follower follower = new Follower(followerActorContext);
1431         followerActor.underlyingActor().setBehavior(follower);
1432
1433         leader = new Leader(leaderActorContext);
1434
1435         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1436         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1437                 AppendEntriesReply.class);
1438
1439         MessageCollectorActor.clearMessages(followerActor);
1440         MessageCollectorActor.clearMessages(leaderActor);
1441
1442         // Verify initial AppendEntries sent.
1443         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1444         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1445         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1446
1447         leaderActor.underlyingActor().setBehavior(leader);
1448
1449         leader.handleMessage(followerActor, appendEntriesReply);
1450
1451         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1452         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1453
1454         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1455         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1456         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1457
1458         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
1459         assertEquals("First entry data", leadersSecondLogEntry.getData(),
1460                 appendEntries.getEntries().get(0).getData());
1461         assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
1462         assertEquals("Second entry data", leadersThirdLogEntry.getData(),
1463                 appendEntries.getEntries().get(1).getData());
1464
1465         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1466         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
1467
1468         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1469
1470         ApplyState applyState = applyStateList.get(0);
1471         assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1472         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1473         assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
1474                 applyState.getReplicatedLogEntry().getData());
1475
1476         applyState = applyStateList.get(1);
1477         assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
1478         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1479         assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
1480                 applyState.getReplicatedLogEntry().getData());
1481
1482         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
1483         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
1484     }
1485
1486     @Test
1487     public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
1488         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
1489
1490         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1491         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1492                 new FiniteDuration(1000, TimeUnit.SECONDS));
1493
1494         leaderActorContext.setReplicatedLog(
1495                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
1496         long leaderCommitIndex = 1;
1497         leaderActorContext.setCommitIndex(leaderCommitIndex);
1498         leaderActorContext.setLastApplied(leaderCommitIndex);
1499
1500         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1501         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1502
1503         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1504
1505         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1506         followerActorContext.setCommitIndex(-1);
1507         followerActorContext.setLastApplied(-1);
1508
1509         Follower follower = new Follower(followerActorContext);
1510         followerActor.underlyingActor().setBehavior(follower);
1511         followerActorContext.setCurrentBehavior(follower);
1512
1513         leader = new Leader(leaderActorContext);
1514
1515         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1516         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1517                 AppendEntriesReply.class);
1518
1519         MessageCollectorActor.clearMessages(followerActor);
1520         MessageCollectorActor.clearMessages(leaderActor);
1521
1522         // Verify initial AppendEntries sent with the leader's current commit index.
1523         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1524         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1525         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1526
1527         leaderActor.underlyingActor().setBehavior(leader);
1528         leaderActorContext.setCurrentBehavior(leader);
1529
1530         leader.handleMessage(followerActor, appendEntriesReply);
1531
1532         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1533         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1534
1535         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1536         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1537         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1538
1539         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1540         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1541                 appendEntries.getEntries().get(0).getData());
1542         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1543         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1544                 appendEntries.getEntries().get(1).getData());
1545
1546         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1547         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1548
1549         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1550
1551         ApplyState applyState = applyStateList.get(0);
1552         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1553         assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1554         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1555                 applyState.getReplicatedLogEntry().getData());
1556
1557         applyState = applyStateList.get(1);
1558         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1559         assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
1560         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1561                 applyState.getReplicatedLogEntry().getData());
1562
1563         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1564         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1565     }
1566
1567     @Test
1568     public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() {
1569         logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
1570
1571         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1572         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1573                 new FiniteDuration(1000, TimeUnit.SECONDS));
1574
1575         leaderActorContext.setReplicatedLog(
1576                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1577         long leaderCommitIndex = 1;
1578         leaderActorContext.setCommitIndex(leaderCommitIndex);
1579         leaderActorContext.setLastApplied(leaderCommitIndex);
1580
1581         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1582         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1583
1584         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1585
1586         followerActorContext.setReplicatedLog(
1587                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
1588         followerActorContext.setCommitIndex(-1);
1589         followerActorContext.setLastApplied(-1);
1590
1591         Follower follower = new Follower(followerActorContext);
1592         followerActor.underlyingActor().setBehavior(follower);
1593         followerActorContext.setCurrentBehavior(follower);
1594
1595         leader = new Leader(leaderActorContext);
1596
1597         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1598         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1599                 AppendEntriesReply.class);
1600
1601         MessageCollectorActor.clearMessages(followerActor);
1602         MessageCollectorActor.clearMessages(leaderActor);
1603
1604         // Verify initial AppendEntries sent with the leader's current commit index.
1605         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1606         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1607         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
1608
1609         leaderActor.underlyingActor().setBehavior(leader);
1610         leaderActorContext.setCurrentBehavior(leader);
1611
1612         leader.handleMessage(followerActor, appendEntriesReply);
1613
1614         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
1615         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1616
1617         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1618         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1619         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1620
1621         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1622         assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
1623         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1624                 appendEntries.getEntries().get(0).getData());
1625         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1626         assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
1627         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1628                 appendEntries.getEntries().get(1).getData());
1629
1630         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1631         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
1632
1633         List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
1634
1635         ApplyState applyState = applyStateList.get(0);
1636         assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
1637         assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1638         assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
1639                 applyState.getReplicatedLogEntry().getData());
1640
1641         applyState = applyStateList.get(1);
1642         assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
1643         assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
1644         assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
1645                 applyState.getReplicatedLogEntry().getData());
1646
1647         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
1648         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
1649         assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
1650     }
1651
1652     @Test
1653     public void testHandleAppendEntriesReplyWithNewerTerm() {
1654         logStart("testHandleAppendEntriesReplyWithNewerTerm");
1655
1656         MockRaftActorContext leaderActorContext = createActorContext();
1657         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1658                 new FiniteDuration(10000, TimeUnit.SECONDS));
1659
1660         leaderActorContext.setReplicatedLog(
1661                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1662
1663         leader = new Leader(leaderActorContext);
1664         leaderActor.underlyingActor().setBehavior(leader);
1665         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1666
1667         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1668                 AppendEntriesReply.class);
1669
1670         assertEquals(false, appendEntriesReply.isSuccess());
1671         assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1672
1673         MessageCollectorActor.clearMessages(leaderActor);
1674     }
1675
1676     @Test
1677     public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() {
1678         logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
1679
1680         MockRaftActorContext leaderActorContext = createActorContext();
1681         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1682                 new FiniteDuration(10000, TimeUnit.SECONDS));
1683
1684         leaderActorContext.setReplicatedLog(
1685                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
1686         leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
1687
1688         leader = new Leader(leaderActorContext);
1689         leaderActor.underlyingActor().setBehavior(leader);
1690         leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
1691
1692         AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1693                 AppendEntriesReply.class);
1694
1695         assertEquals(false, appendEntriesReply.isSuccess());
1696         assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
1697
1698         MessageCollectorActor.clearMessages(leaderActor);
1699     }
1700
1701     @Test
1702     public void testHandleAppendEntriesReplySuccess() {
1703         logStart("testHandleAppendEntriesReplySuccess");
1704
1705         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1706
1707         leaderActorContext.setReplicatedLog(
1708                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
1709
1710         leaderActorContext.setCommitIndex(1);
1711         leaderActorContext.setLastApplied(1);
1712         leaderActorContext.getTermInformation().update(1, "leader");
1713
1714         leader = new Leader(leaderActorContext);
1715
1716         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1717
1718         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
1719         assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
1720
1721         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
1722
1723         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1724
1725         assertEquals(RaftState.Leader, raftActorBehavior.state());
1726
1727         assertEquals(2, leaderActorContext.getCommitIndex());
1728
1729         ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
1730                 leaderActor, ApplyJournalEntries.class);
1731
1732         assertEquals(2, leaderActorContext.getLastApplied());
1733
1734         assertEquals(2, applyJournalEntries.getToIndex());
1735
1736         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
1737                 ApplyState.class);
1738
1739         assertEquals(1,applyStateList.size());
1740
1741         ApplyState applyState = applyStateList.get(0);
1742
1743         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
1744
1745         assertEquals(2, followerInfo.getMatchIndex());
1746         assertEquals(3, followerInfo.getNextIndex());
1747         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
1748         assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
1749     }
1750
1751     @Test
1752     public void testHandleAppendEntriesReplyUnknownFollower() {
1753         logStart("testHandleAppendEntriesReplyUnknownFollower");
1754
1755         MockRaftActorContext leaderActorContext = createActorContext();
1756
1757         leader = new Leader(leaderActorContext);
1758
1759         AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
1760
1761         RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
1762
1763         assertEquals(RaftState.Leader, raftActorBehavior.state());
1764     }
1765
1766     @Test
1767     public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
1768         logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
1769
1770         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1771         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1772                 new FiniteDuration(1000, TimeUnit.SECONDS));
1773         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
1774
1775         leaderActorContext.setReplicatedLog(
1776                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
1777         long leaderCommitIndex = 3;
1778         leaderActorContext.setCommitIndex(leaderCommitIndex);
1779         leaderActorContext.setLastApplied(leaderCommitIndex);
1780
1781         final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
1782         final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
1783         final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
1784         final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
1785
1786         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1787
1788         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
1789         followerActorContext.setCommitIndex(-1);
1790         followerActorContext.setLastApplied(-1);
1791
1792         Follower follower = new Follower(followerActorContext);
1793         followerActor.underlyingActor().setBehavior(follower);
1794         followerActorContext.setCurrentBehavior(follower);
1795
1796         leader = new Leader(leaderActorContext);
1797
1798         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
1799         final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor,
1800                 AppendEntriesReply.class);
1801
1802         MessageCollectorActor.clearMessages(followerActor);
1803         MessageCollectorActor.clearMessages(leaderActor);
1804
1805         // Verify initial AppendEntries sent with the leader's current commit index.
1806         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
1807         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
1808         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
1809
1810         leaderActor.underlyingActor().setBehavior(leader);
1811         leaderActorContext.setCurrentBehavior(leader);
1812
1813         leader.handleMessage(followerActor, appendEntriesReply);
1814
1815         List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor,
1816                 AppendEntries.class, 2);
1817         MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
1818
1819         appendEntries = appendEntriesList.get(0);
1820         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1821         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
1822         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1823
1824         assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
1825         assertEquals("First entry data", leadersFirstLogEntry.getData(),
1826                 appendEntries.getEntries().get(0).getData());
1827         assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
1828         assertEquals("Second entry data", leadersSecondLogEntry.getData(),
1829                 appendEntries.getEntries().get(1).getData());
1830
1831         appendEntries = appendEntriesList.get(1);
1832         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
1833         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
1834         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
1835
1836         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
1837         assertEquals("First entry data", leadersThirdLogEntry.getData(),
1838                 appendEntries.getEntries().get(0).getData());
1839         assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
1840         assertEquals("Second entry data", leadersFourthLogEntry.getData(),
1841                 appendEntries.getEntries().get(1).getData());
1842
1843         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
1844         assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
1845
1846         MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
1847
1848         assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
1849         assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
1850     }
1851
1852     @Test
1853     public void testHandleRequestVoteReply() {
1854         logStart("testHandleRequestVoteReply");
1855
1856         MockRaftActorContext leaderActorContext = createActorContext();
1857
1858         leader = new Leader(leaderActorContext);
1859
1860         // Should be a no-op.
1861         RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
1862                 new RequestVoteReply(1, true));
1863
1864         assertEquals(RaftState.Leader, raftActorBehavior.state());
1865
1866         raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
1867
1868         assertEquals(RaftState.Leader, raftActorBehavior.state());
1869     }
1870
1871     @Test
1872     public void testIsolatedLeaderCheckNoFollowers() {
1873         logStart("testIsolatedLeaderCheckNoFollowers");
1874
1875         MockRaftActorContext leaderActorContext = createActorContext();
1876
1877         leader = new Leader(leaderActorContext);
1878         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1879         assertTrue(newBehavior instanceof Leader);
1880     }
1881
1882     @Test
1883     public void testIsolatedLeaderCheckNoVotingFollowers() {
1884         logStart("testIsolatedLeaderCheckNoVotingFollowers");
1885
1886         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
1887         Follower follower = new Follower(followerActorContext);
1888         followerActor.underlyingActor().setBehavior(follower);
1889
1890         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
1891         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
1892                 new FiniteDuration(1000, TimeUnit.SECONDS));
1893         leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
1894
1895         leader = new Leader(leaderActorContext);
1896         leader.getFollower(FOLLOWER_ID).markFollowerActive();
1897         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1898         assertTrue("Expected Leader", newBehavior instanceof Leader);
1899     }
1900
1901     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
1902         ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
1903         ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
1904
1905         MockRaftActorContext leaderActorContext = createActorContext();
1906
1907         Map<String, String> peerAddresses = new HashMap<>();
1908         peerAddresses.put("follower-1", followerActor1.path().toString());
1909         peerAddresses.put("follower-2", followerActor2.path().toString());
1910
1911         leaderActorContext.setPeerAddresses(peerAddresses);
1912         leaderActorContext.setRaftPolicy(raftPolicy);
1913
1914         leader = new Leader(leaderActorContext);
1915
1916         leader.markFollowerActive("follower-1");
1917         leader.markFollowerActive("follower-2");
1918         RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1919         assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
1920
1921         // kill 1 follower and verify if that got killed
1922         final TestKit probe = new TestKit(getSystem());
1923         probe.watch(followerActor1);
1924         followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1925         final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
1926         assertEquals(termMsg1.getActor(), followerActor1);
1927
1928         leader.markFollowerInActive("follower-1");
1929         leader.markFollowerActive("follower-2");
1930         newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1931         assertTrue("Behavior not instance of Leader when majority of followers are active",
1932                 newBehavior instanceof Leader);
1933
1934         // kill 2nd follower and leader should change to Isolated leader
1935         followerActor2.tell(PoisonPill.getInstance(), null);
1936         probe.watch(followerActor2);
1937         followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1938         final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
1939         assertEquals(termMsg2.getActor(), followerActor2);
1940
1941         leader.markFollowerInActive("follower-2");
1942         return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
1943     }
1944
1945     @Test
1946     public void testIsolatedLeaderCheckTwoFollowers() {
1947         logStart("testIsolatedLeaderCheckTwoFollowers");
1948
1949         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
1950
1951         assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
1952             newBehavior instanceof IsolatedLeader);
1953     }
1954
1955     @Test
1956     public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
1957         logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
1958
1959         RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
1960
1961         assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
1962                 newBehavior instanceof Leader);
1963     }
1964
1965     @Test
1966     public void testLaggingFollowerStarvation() {
1967         logStart("testLaggingFollowerStarvation");
1968
1969         String leaderActorId = actorFactory.generateActorId("leader");
1970         String follower1ActorId = actorFactory.generateActorId("follower");
1971         String follower2ActorId = actorFactory.generateActorId("follower");
1972
1973         final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
1974         final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
1975
1976         MockRaftActorContext leaderActorContext =
1977                 new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
1978
1979         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
1980         configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
1981         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
1982
1983         leaderActorContext.setConfigParams(configParams);
1984
1985         leaderActorContext.setReplicatedLog(
1986                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
1987
1988         Map<String, String> peerAddresses = new HashMap<>();
1989         peerAddresses.put(follower1ActorId,
1990                 follower1Actor.path().toString());
1991         peerAddresses.put(follower2ActorId,
1992                 follower2Actor.path().toString());
1993
1994         leaderActorContext.setPeerAddresses(peerAddresses);
1995         leaderActorContext.getTermInformation().update(1, leaderActorId);
1996
1997         leader = createBehavior(leaderActorContext);
1998
1999         leaderActor.underlyingActor().setBehavior(leader);
2000
2001         for (int i = 1; i < 6; i++) {
2002             // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
2003             RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor,
2004                     new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
2005             assertTrue(newBehavior == leader);
2006             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
2007         }
2008
2009         // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
2010         List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
2011
2012         assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
2013                 heartbeats.size() > 1);
2014
2015         // Check if follower-2 got AppendEntries during this time and was not starved
2016         List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
2017
2018         assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
2019                 appendEntries.size() > 1);
2020     }
2021
2022     @Test
2023     public void testReplicationConsensusWithNonVotingFollower() {
2024         logStart("testReplicationConsensusWithNonVotingFollower");
2025
2026         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2027         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2028                 new FiniteDuration(1000, TimeUnit.SECONDS));
2029
2030         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2031         leaderActorContext.setCommitIndex(-1);
2032         leaderActorContext.setLastApplied(-1);
2033
2034         String nonVotingFollowerId = "nonvoting-follower";
2035         ActorRef nonVotingFollowerActor = actorFactory.createActor(
2036                 MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
2037
2038         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
2039                 VotingState.NON_VOTING);
2040
2041         leader = new Leader(leaderActorContext);
2042         leaderActorContext.setCurrentBehavior(leader);
2043
2044         // Ignore initial heartbeats
2045         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2046         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2047
2048         MessageCollectorActor.clearMessages(followerActor);
2049         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2050         MessageCollectorActor.clearMessages(leaderActor);
2051
2052         // Send a Replicate message and wait for AppendEntries.
2053         sendReplicate(leaderActorContext, 0);
2054
2055         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2056         MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
2057
2058         // Send reply only from the voting follower and verify consensus via ApplyState.
2059         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2060
2061         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2062
2063         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
2064
2065         MessageCollectorActor.clearMessages(followerActor);
2066         MessageCollectorActor.clearMessages(nonVotingFollowerActor);
2067         MessageCollectorActor.clearMessages(leaderActor);
2068
2069         // Send another Replicate message
2070         sendReplicate(leaderActorContext, 1);
2071
2072         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2073         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
2074                 AppendEntries.class);
2075         assertEquals("Log entries size", 1, appendEntries.getEntries().size());
2076         assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
2077
2078         // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
2079         leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
2080
2081         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
2082
2083         // Send reply from the voting follower and verify consensus.
2084         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2085
2086         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2087     }
2088
2089     @Test
2090     public void testTransferLeadershipWithFollowerInSync() {
2091         logStart("testTransferLeadershipWithFollowerInSync");
2092
2093         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2094         leaderActorContext.setLastApplied(-1);
2095         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2096                 new FiniteDuration(1000, TimeUnit.SECONDS));
2097         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2098
2099         leader = new Leader(leaderActorContext);
2100         leaderActorContext.setCurrentBehavior(leader);
2101
2102         // Initial heartbeat
2103         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2104         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2105         MessageCollectorActor.clearMessages(followerActor);
2106
2107         sendReplicate(leaderActorContext, 0);
2108         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2109
2110         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2111         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
2112         MessageCollectorActor.clearMessages(followerActor);
2113
2114         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2115         leader.transferLeadership(mockTransferCohort);
2116
2117         verify(mockTransferCohort, never()).transferComplete();
2118         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2119         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2120         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
2121
2122         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2123         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
2124
2125         // Leader should force an election timeout
2126         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2127
2128         verify(mockTransferCohort).transferComplete();
2129     }
2130
2131     @Test
2132     public void testTransferLeadershipWithEmptyLog() {
2133         logStart("testTransferLeadershipWithEmptyLog");
2134
2135         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2136         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2137                 new FiniteDuration(1000, TimeUnit.SECONDS));
2138         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2139
2140         leader = new Leader(leaderActorContext);
2141         leaderActorContext.setCurrentBehavior(leader);
2142
2143         // Initial heartbeat
2144         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2145         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2146         MessageCollectorActor.clearMessages(followerActor);
2147
2148         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2149         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2150         leader.transferLeadership(mockTransferCohort);
2151
2152         verify(mockTransferCohort, never()).transferComplete();
2153         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2154         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2155
2156         // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
2157         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2158
2159         // Leader should force an election timeout
2160         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2161
2162         verify(mockTransferCohort).transferComplete();
2163     }
2164
2165     @Test
2166     public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
2167         logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
2168
2169         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2170         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2171                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2172
2173         leader = new Leader(leaderActorContext);
2174         leaderActorContext.setCurrentBehavior(leader);
2175
2176         // Initial heartbeat
2177         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2178         MessageCollectorActor.clearMessages(followerActor);
2179
2180         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2181         doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
2182         leader.transferLeadership(mockTransferCohort);
2183
2184         verify(mockTransferCohort, never()).transferComplete();
2185
2186         // Sync up the follower.
2187         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2188         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2189         MessageCollectorActor.clearMessages(followerActor);
2190
2191         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2192                 .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2193         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2194         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2195         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
2196
2197         // Leader should force an election timeout
2198         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
2199
2200         verify(mockTransferCohort).transferComplete();
2201     }
2202
2203     @Test
2204     public void testTransferLeadershipWithFollowerSyncTimeout() {
2205         logStart("testTransferLeadershipWithFollowerSyncTimeout");
2206
2207         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2208         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2209                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
2210         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
2211         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2212
2213         leader = new Leader(leaderActorContext);
2214         leaderActorContext.setCurrentBehavior(leader);
2215
2216         // Initial heartbeat
2217         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2218         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
2219         MessageCollectorActor.clearMessages(followerActor);
2220
2221         sendReplicate(leaderActorContext, 0);
2222         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2223
2224         MessageCollectorActor.clearMessages(followerActor);
2225
2226         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
2227         leader.transferLeadership(mockTransferCohort);
2228
2229         verify(mockTransferCohort, never()).transferComplete();
2230
2231         // Send heartbeats to time out the transfer.
2232         for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
2233             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2234                     .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
2235             leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2236         }
2237
2238         verify(mockTransferCohort).abortTransfer();
2239         verify(mockTransferCohort, never()).transferComplete();
2240         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
2241     }
2242
2243     @Test
2244     public void testReplicationWithPayloadSizeThatExceedsThreshold() {
2245         logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
2246
2247         final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
2248                 Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
2249                         new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
2250         final MockRaftActorContext.MockPayload largePayload =
2251                 new MockRaftActorContext.MockPayload("large", serializedSize);
2252
2253         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2254         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2255                 new FiniteDuration(300, TimeUnit.MILLISECONDS));
2256         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
2257         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2258         leaderActorContext.setCommitIndex(-1);
2259         leaderActorContext.setLastApplied(-1);
2260
2261         leader = new Leader(leaderActorContext);
2262         leaderActorContext.setCurrentBehavior(leader);
2263
2264         // Send initial heartbeat reply so follower is marked active
2265         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2266         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2267         MessageCollectorActor.clearMessages(followerActor);
2268
2269         // Send normal payload first to prime commit index.
2270         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2271         sendReplicate(leaderActorContext, term, 0);
2272
2273         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2274         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2275         assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
2276
2277         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
2278         assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
2279         MessageCollectorActor.clearMessages(followerActor);
2280
2281         // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
2282         sendReplicate(leaderActorContext, term, 1, largePayload);
2283
2284         MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2285         assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
2286         assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
2287
2288         final Identifier slicingId = messageSlice.getIdentifier();
2289
2290         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2291         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
2292         assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
2293         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2294         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2295         MessageCollectorActor.clearMessages(followerActor);
2296
2297         // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
2298
2299         // Sleep for the heartbeat interval so AppendEntries is sent.
2300         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2301                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2302
2303         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2304
2305         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2306         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2307         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2308         MessageCollectorActor.clearMessages(followerActor);
2309
2310         // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
2311
2312         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
2313         messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2314         assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
2315
2316         leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
2317
2318         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
2319
2320         MessageCollectorActor.clearMessages(followerActor);
2321
2322         // Send another normal payload.
2323
2324         sendReplicate(leaderActorContext, term, 2);
2325
2326         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2327         assertEquals("Entries size", 1, appendEntries.getEntries().size());
2328         assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
2329         assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
2330     }
2331
2332     @Test
2333     public void testLargePayloadSlicingExpiration() {
2334         logStart("testLargePayloadSlicingExpiration");
2335
2336         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2337         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2338                 new FiniteDuration(100, TimeUnit.MILLISECONDS));
2339         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
2340         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
2341         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2342         leaderActorContext.setCommitIndex(-1);
2343         leaderActorContext.setLastApplied(-1);
2344
2345         final long term = leaderActorContext.getTermInformation().getCurrentTerm();
2346         leader = new Leader(leaderActorContext);
2347         leaderActorContext.setCurrentBehavior(leader);
2348
2349         // Send initial heartbeat reply so follower is marked active
2350         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2351         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
2352         MessageCollectorActor.clearMessages(followerActor);
2353
2354         sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
2355                 leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
2356         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2357
2358         // Sleep for at least 3 * election timeout so the slicing state expires.
2359         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2360                 .getElectionTimeOutInterval().toMillis() * 3  + 50, TimeUnit.MILLISECONDS);
2361         MessageCollectorActor.clearMessages(followerActor);
2362
2363         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2364
2365         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2366         assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
2367         assertEquals("Entries size", 0, appendEntries.getEntries().size());
2368
2369         MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
2370         MessageCollectorActor.clearMessages(followerActor);
2371
2372         // Send an AppendEntriesReply - this should restart the slicing.
2373
2374         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2375                 .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
2376
2377         leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
2378
2379         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
2380     }
2381
2382     @Test
2383     public void testLeaderAddressInAppendEntries() {
2384         logStart("testLeaderAddressInAppendEntries");
2385
2386         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
2387         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
2388                 FiniteDuration.create(50, TimeUnit.MILLISECONDS));
2389         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
2390         leaderActorContext.setCommitIndex(-1);
2391         leaderActorContext.setLastApplied(-1);
2392
2393         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
2394             peerId -> leaderActor.path().toString());
2395
2396         leader = new Leader(leaderActorContext);
2397         leaderActorContext.setCurrentBehavior(leader);
2398
2399         // Initial heartbeat shouldn't have the leader address
2400
2401         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2402         assertFalse(appendEntries.getLeaderAddress().isPresent());
2403         MessageCollectorActor.clearMessages(followerActor);
2404
2405         // Send AppendEntriesReply indicating the follower needs the leader address
2406
2407         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
2408                 RaftVersions.CURRENT_VERSION));
2409
2410         // Sleep for the heartbeat interval so AppendEntries is sent.
2411         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2412                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2413
2414         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2415
2416         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2417         assertTrue(appendEntries.getLeaderAddress().isPresent());
2418         assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
2419         MessageCollectorActor.clearMessages(followerActor);
2420
2421         // Send AppendEntriesReply indicating the follower does not need the leader address
2422
2423         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
2424                 RaftVersions.CURRENT_VERSION));
2425
2426         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
2427                 .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
2428
2429         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
2430
2431         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
2432         assertFalse(appendEntries.getLeaderAddress().isPresent());
2433     }
2434
2435     @Override
2436     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
2437             final ActorRef actorRef, final RaftRPC rpc) {
2438         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
2439         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
2440     }
2441
2442     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
2443
2444         private final long electionTimeOutIntervalMillis;
2445         private final int snapshotChunkSize;
2446
2447         MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
2448             this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
2449             this.snapshotChunkSize = snapshotChunkSize;
2450         }
2451
2452         @Override
2453         public FiniteDuration getElectionTimeOutInterval() {
2454             return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
2455         }
2456
2457         @Override
2458         public int getSnapshotChunkSize() {
2459             return snapshotChunkSize;
2460         }
2461     }
2462 }