9ca2ba0a6b40e27766938d1d1298735aab676421
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.testkit.TestActorRef;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.Snapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
44 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
45 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
50 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
53 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
55 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
56 import scala.concurrent.duration.FiniteDuration;
57
58 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
59
60     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
61             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
62
63     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
64             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
65
66     private Follower follower;
67
68     private final short payloadVersion = 5;
69
70     @Override
71     @After
72     public void tearDown() throws Exception {
73         if (follower != null) {
74             follower.close();
75         }
76
77         super.tearDown();
78     }
79
80     @Override
81     protected Follower createBehavior(RaftActorContext actorContext) {
82         return spy(new Follower(actorContext));
83     }
84
85     @Override
86     protected  MockRaftActorContext createActorContext() {
87         return createActorContext(followerActor);
88     }
89
90     @Override
91     protected  MockRaftActorContext createActorContext(ActorRef actorRef) {
92         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
93         context.setPayloadVersion(payloadVersion );
94         return context;
95     }
96
97     @Test
98     public void testThatAnElectionTimeoutIsTriggered() {
99         MockRaftActorContext actorContext = createActorContext();
100         follower = new Follower(actorContext);
101
102         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
103                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
104     }
105
106     @Test
107     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
108         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
109
110         MockRaftActorContext context = createActorContext();
111         follower = new Follower(context);
112
113         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
114                 TimeUnit.MILLISECONDS);
115         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
116
117         assertTrue(raftBehavior instanceof Candidate);
118     }
119
120     @Test
121     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
122         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
123
124         MockRaftActorContext context = createActorContext();
125         ((DefaultConfigParamsImpl) context.getConfigParams())
126                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
127         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
128
129         follower = new Follower(context);
130         context.setCurrentBehavior(follower);
131
132         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
133                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
134         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
135                 -1, -1, (short) 1));
136
137         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
138         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
139         assertTrue(raftBehavior instanceof Follower);
140
141         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
142                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
143         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
144                 -1, -1, (short) 1));
145
146         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
147         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
148         assertTrue(raftBehavior instanceof Follower);
149     }
150
151     @Test
152     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
153         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
154
155         MockRaftActorContext context = createActorContext();
156         long term = 1000;
157         context.getTermInformation().update(term, null);
158
159         follower = createBehavior(context);
160
161         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
162
163         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
164
165         assertEquals("isVoteGranted", true, reply.isVoteGranted());
166         assertEquals("getTerm", term, reply.getTerm());
167         verify(follower).scheduleElection(any(FiniteDuration.class));
168     }
169
170     @Test
171     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
172         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
173
174         MockRaftActorContext context = createActorContext();
175         long term = 1000;
176         context.getTermInformation().update(term, "test");
177
178         follower = createBehavior(context);
179
180         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
181
182         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
183
184         assertEquals("isVoteGranted", false, reply.isVoteGranted());
185         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
186     }
187
188
189     @Test
190     public void testHandleFirstAppendEntries() throws Exception {
191         logStart("testHandleFirstAppendEntries");
192
193         MockRaftActorContext context = createActorContext();
194         context.getReplicatedLog().clear(0,2);
195         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
196         context.getReplicatedLog().setSnapshotIndex(99);
197
198         List<ReplicatedLogEntry> entries = Arrays.asList(
199                 newReplicatedLogEntry(2, 101, "foo"));
200
201         Assert.assertEquals(1, context.getReplicatedLog().size());
202
203         // The new commitIndex is 101
204         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
205
206         follower = createBehavior(context);
207         follower.handleMessage(leaderActor, appendEntries);
208
209         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
210                 FollowerInitialSyncUpStatus.class);
211         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
212
213         assertFalse(syncStatus.isInitialSyncDone());
214         assertTrue("append entries reply should be true", reply.isSuccess());
215     }
216
217     @Test
218     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
219         logStart("testHandleFirstAppendEntries");
220
221         MockRaftActorContext context = createActorContext();
222
223         List<ReplicatedLogEntry> entries = Arrays.asList(
224                 newReplicatedLogEntry(2, 101, "foo"));
225
226         // The new commitIndex is 101
227         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
228
229         follower = createBehavior(context);
230         follower.handleMessage(leaderActor, appendEntries);
231
232         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
233                 FollowerInitialSyncUpStatus.class);
234         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
235
236         assertFalse(syncStatus.isInitialSyncDone());
237         assertFalse("append entries reply should be false", reply.isSuccess());
238     }
239
240     @Test
241     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
242             throws Exception {
243         logStart("testHandleFirstAppendEntries");
244
245         MockRaftActorContext context = createActorContext();
246         context.getReplicatedLog().clear(0,2);
247         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
248         context.getReplicatedLog().setSnapshotIndex(99);
249
250         List<ReplicatedLogEntry> entries = Arrays.asList(
251                 newReplicatedLogEntry(2, 101, "foo"));
252
253         // The new commitIndex is 101
254         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
255
256         follower = createBehavior(context);
257         follower.handleMessage(leaderActor, appendEntries);
258
259         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
260                 FollowerInitialSyncUpStatus.class);
261         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
262
263         assertFalse(syncStatus.isInitialSyncDone());
264         assertTrue("append entries reply should be true", reply.isSuccess());
265     }
266
267     @Test
268     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
269             throws Exception {
270         logStart("testHandleFirstAppendEntries");
271
272         MockRaftActorContext context = createActorContext();
273         context.getReplicatedLog().clear(0,2);
274         context.getReplicatedLog().setSnapshotIndex(100);
275
276         List<ReplicatedLogEntry> entries = Arrays.asList(
277                 newReplicatedLogEntry(2, 101, "foo"));
278
279         // The new commitIndex is 101
280         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
281
282         follower = createBehavior(context);
283         follower.handleMessage(leaderActor, appendEntries);
284
285         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
286                 FollowerInitialSyncUpStatus.class);
287         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
288
289         assertFalse(syncStatus.isInitialSyncDone());
290         assertTrue("append entries reply should be true", reply.isSuccess());
291     }
292
293     @Test
294     public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
295             throws Exception {
296         logStart(
297                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
298
299         MockRaftActorContext context = createActorContext();
300         context.getReplicatedLog().clear(0,2);
301         context.getReplicatedLog().setSnapshotIndex(100);
302
303         List<ReplicatedLogEntry> entries = Arrays.asList(
304                 newReplicatedLogEntry(2, 105, "foo"));
305
306         // The new commitIndex is 101
307         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
308
309         follower = createBehavior(context);
310         follower.handleMessage(leaderActor, appendEntries);
311
312         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
313                 FollowerInitialSyncUpStatus.class);
314         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
315
316         assertFalse(syncStatus.isInitialSyncDone());
317         assertFalse("append entries reply should be false", reply.isSuccess());
318     }
319
320     @Test
321     public void testHandleSyncUpAppendEntries() throws Exception {
322         logStart("testHandleSyncUpAppendEntries");
323
324         MockRaftActorContext context = createActorContext();
325
326         List<ReplicatedLogEntry> entries = Arrays.asList(
327                 newReplicatedLogEntry(2, 101, "foo"));
328
329         // The new commitIndex is 101
330         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
331
332         follower = createBehavior(context);
333         follower.handleMessage(leaderActor, appendEntries);
334
335         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
336                 FollowerInitialSyncUpStatus.class);
337
338         assertFalse(syncStatus.isInitialSyncDone());
339
340         // Clear all the messages
341         followerActor.underlyingActor().clear();
342
343         context.setLastApplied(101);
344         context.setCommitIndex(101);
345         setLastLogEntry(context, 1, 101,
346                 new MockRaftActorContext.MockPayload(""));
347
348         entries = Arrays.asList(
349                 newReplicatedLogEntry(2, 101, "foo"));
350
351         // The new commitIndex is 101
352         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
353         follower.handleMessage(leaderActor, appendEntries);
354
355         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
356
357         assertTrue(syncStatus.isInitialSyncDone());
358
359         followerActor.underlyingActor().clear();
360
361         // Sending the same message again should not generate another message
362
363         follower.handleMessage(leaderActor, appendEntries);
364
365         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
366
367         assertNull(syncStatus);
368
369     }
370
371     @Test
372     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
373         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
374
375         MockRaftActorContext context = createActorContext();
376
377         List<ReplicatedLogEntry> entries = Arrays.asList(
378                 newReplicatedLogEntry(2, 101, "foo"));
379
380         // The new commitIndex is 101
381         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
382
383         follower = createBehavior(context);
384         follower.handleMessage(leaderActor, appendEntries);
385
386         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
387                 FollowerInitialSyncUpStatus.class);
388
389         assertFalse(syncStatus.isInitialSyncDone());
390
391         // Clear all the messages
392         followerActor.underlyingActor().clear();
393
394         context.setLastApplied(100);
395         setLastLogEntry(context, 1, 100,
396                 new MockRaftActorContext.MockPayload(""));
397
398         entries = Arrays.asList(
399                 newReplicatedLogEntry(2, 101, "foo"));
400
401         // leader-2 is becoming the leader now and it says the commitIndex is 45
402         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
403         follower.handleMessage(leaderActor, appendEntries);
404
405         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
406
407         // We get a new message saying initial status is not done
408         assertFalse(syncStatus.isInitialSyncDone());
409
410     }
411
412
413     @Test
414     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
415         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
416
417         MockRaftActorContext context = createActorContext();
418
419         List<ReplicatedLogEntry> entries = Arrays.asList(
420                 newReplicatedLogEntry(2, 101, "foo"));
421
422         // The new commitIndex is 101
423         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
424
425         follower = createBehavior(context);
426         follower.handleMessage(leaderActor, appendEntries);
427
428         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
429                 FollowerInitialSyncUpStatus.class);
430
431         assertFalse(syncStatus.isInitialSyncDone());
432
433         // Clear all the messages
434         followerActor.underlyingActor().clear();
435
436         context.setLastApplied(101);
437         context.setCommitIndex(101);
438         setLastLogEntry(context, 1, 101,
439                 new MockRaftActorContext.MockPayload(""));
440
441         entries = Arrays.asList(
442                 newReplicatedLogEntry(2, 101, "foo"));
443
444         // The new commitIndex is 101
445         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
446         follower.handleMessage(leaderActor, appendEntries);
447
448         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
449
450         assertTrue(syncStatus.isInitialSyncDone());
451
452         // Clear all the messages
453         followerActor.underlyingActor().clear();
454
455         context.setLastApplied(100);
456         setLastLogEntry(context, 1, 100,
457                 new MockRaftActorContext.MockPayload(""));
458
459         entries = Arrays.asList(
460                 newReplicatedLogEntry(2, 101, "foo"));
461
462         // leader-2 is becoming the leader now and it says the commitIndex is 45
463         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
464         follower.handleMessage(leaderActor, appendEntries);
465
466         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
467
468         // We get a new message saying initial status is not done
469         assertFalse(syncStatus.isInitialSyncDone());
470
471     }
472
473
474     /**
475      * This test verifies that when an AppendEntries RPC is received by a RaftActor
476      * with a commitIndex that is greater than what has been applied to the
477      * state machine of the RaftActor, the RaftActor applies the state and
478      * sets it current applied state to the commitIndex of the sender.
479      */
480     @Test
481     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
482         logStart("testHandleAppendEntriesWithNewerCommitIndex");
483
484         MockRaftActorContext context = createActorContext();
485
486         context.setLastApplied(100);
487         setLastLogEntry(context, 1, 100,
488                 new MockRaftActorContext.MockPayload(""));
489         context.getReplicatedLog().setSnapshotIndex(99);
490
491         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
492                 newReplicatedLogEntry(2, 101, "foo"));
493
494         // The new commitIndex is 101
495         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
496
497         follower = createBehavior(context);
498         follower.handleMessage(leaderActor, appendEntries);
499
500         assertEquals("getLastApplied", 101L, context.getLastApplied());
501     }
502
503     /**
504      * This test verifies that when an AppendEntries is received a specific prevLogTerm
505      * which does not match the term that is in RaftActors log entry at prevLogIndex
506      * then the RaftActor does not change it's state and it returns a failure.
507      */
508     @Test
509     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
510         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
511
512         MockRaftActorContext context = createActorContext();
513
514         // First set the receivers term to lower number
515         context.getTermInformation().update(95, "test");
516
517         // AppendEntries is now sent with a bigger term
518         // this will set the receivers term to be the same as the sender's term
519         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
520
521         follower = createBehavior(context);
522
523         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
524
525         Assert.assertSame(follower, newBehavior);
526
527         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
528                 AppendEntriesReply.class);
529
530         assertEquals("isSuccess", false, reply.isSuccess());
531     }
532
533     /**
534      * This test verifies that when a new AppendEntries message is received with
535      * new entries and the logs of the sender and receiver match that the new
536      * entries get added to the log and the log is incremented by the number of
537      * entries received in appendEntries.
538      */
539     @Test
540     public void testHandleAppendEntriesAddNewEntries() {
541         logStart("testHandleAppendEntriesAddNewEntries");
542
543         MockRaftActorContext context = createActorContext();
544
545         // First set the receivers term to lower number
546         context.getTermInformation().update(1, "test");
547
548         // Prepare the receivers log
549         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
550         log.append(newReplicatedLogEntry(1, 0, "zero"));
551         log.append(newReplicatedLogEntry(1, 1, "one"));
552         log.append(newReplicatedLogEntry(1, 2, "two"));
553
554         context.setReplicatedLog(log);
555
556         // Prepare the entries to be sent with AppendEntries
557         List<ReplicatedLogEntry> entries = new ArrayList<>();
558         entries.add(newReplicatedLogEntry(1, 3, "three"));
559         entries.add(newReplicatedLogEntry(1, 4, "four"));
560
561         // Send appendEntries with the same term as was set on the receiver
562         // before the new behavior was created (1 in this case)
563         // This will not work for a Candidate because as soon as a Candidate
564         // is created it increments the term
565         short leaderPayloadVersion = 10;
566         String leaderId = "leader-1";
567         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
568
569         follower = createBehavior(context);
570
571         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
572
573         Assert.assertSame(follower, newBehavior);
574
575         assertEquals("Next index", 5, log.last().getIndex() + 1);
576         assertEquals("Entry 3", entries.get(0), log.get(3));
577         assertEquals("Entry 4", entries.get(1), log.get(4));
578
579         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
580         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
581
582         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
583     }
584
585     /**
586      * This test verifies that when a new AppendEntries message is received with
587      * new entries and the logs of the sender and receiver are out-of-sync that
588      * the log is first corrected by removing the out of sync entries from the
589      * log and then adding in the new entries sent with the AppendEntries message.
590      */
591     @Test
592     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
593         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
594
595         MockRaftActorContext context = createActorContext();
596
597         // First set the receivers term to lower number
598         context.getTermInformation().update(1, "test");
599
600         // Prepare the receivers log
601         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
602         log.append(newReplicatedLogEntry(1, 0, "zero"));
603         log.append(newReplicatedLogEntry(1, 1, "one"));
604         log.append(newReplicatedLogEntry(1, 2, "two"));
605
606         context.setReplicatedLog(log);
607
608         // Prepare the entries to be sent with AppendEntries
609         List<ReplicatedLogEntry> entries = new ArrayList<>();
610         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
611         entries.add(newReplicatedLogEntry(2, 3, "three"));
612
613         // Send appendEntries with the same term as was set on the receiver
614         // before the new behavior was created (1 in this case)
615         // This will not work for a Candidate because as soon as a Candidate
616         // is created it increments the term
617         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
618
619         follower = createBehavior(context);
620
621         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
622
623         Assert.assertSame(follower, newBehavior);
624
625         // The entry at index 2 will be found out-of-sync with the leader
626         // and will be removed
627         // Then the two new entries will be added to the log
628         // Thus making the log to have 4 entries
629         assertEquals("Next index", 4, log.last().getIndex() + 1);
630         //assertEquals("Entry 2", entries.get(0), log.get(2));
631
632         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
633
634         // Check that the entry at index 2 has the new data
635         assertEquals("Entry 2", entries.get(0), log.get(2));
636
637         assertEquals("Entry 3", entries.get(1), log.get(3));
638
639         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
640     }
641
642     @Test
643     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
644         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
645
646         MockRaftActorContext context = createActorContext();
647
648         // First set the receivers term to lower number
649         context.getTermInformation().update(1, "test");
650
651         // Prepare the receivers log
652         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
653         log.append(newReplicatedLogEntry(1, 0, "zero"));
654         log.append(newReplicatedLogEntry(1, 1, "one"));
655         log.append(newReplicatedLogEntry(1, 2, "two"));
656
657         context.setReplicatedLog(log);
658
659         // Prepare the entries to be sent with AppendEntries
660         List<ReplicatedLogEntry> entries = new ArrayList<>();
661         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
662         entries.add(newReplicatedLogEntry(2, 3, "three"));
663
664         // Send appendEntries with the same term as was set on the receiver
665         // before the new behavior was created (1 in this case)
666         // This will not work for a Candidate because as soon as a Candidate
667         // is created it increments the term
668         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
669
670         context.setRaftPolicy(createRaftPolicy(false, true));
671         follower = createBehavior(context);
672
673         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
674
675         Assert.assertSame(follower, newBehavior);
676
677         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
678     }
679
680     @Test
681     public void testHandleAppendEntriesPreviousLogEntryMissing() {
682         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
683
684         final MockRaftActorContext context = createActorContext();
685
686         // Prepare the receivers log
687         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
688         log.append(newReplicatedLogEntry(1, 0, "zero"));
689         log.append(newReplicatedLogEntry(1, 1, "one"));
690         log.append(newReplicatedLogEntry(1, 2, "two"));
691
692         context.setReplicatedLog(log);
693
694         // Prepare the entries to be sent with AppendEntries
695         List<ReplicatedLogEntry> entries = new ArrayList<>();
696         entries.add(newReplicatedLogEntry(1, 4, "four"));
697
698         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
699
700         follower = createBehavior(context);
701
702         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
703
704         Assert.assertSame(follower, newBehavior);
705
706         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
707     }
708
709     @Test
710     public void testHandleAppendEntriesWithExistingLogEntry() {
711         logStart("testHandleAppendEntriesWithExistingLogEntry");
712
713         MockRaftActorContext context = createActorContext();
714
715         context.getTermInformation().update(1, "test");
716
717         // Prepare the receivers log
718         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
719         log.append(newReplicatedLogEntry(1, 0, "zero"));
720         log.append(newReplicatedLogEntry(1, 1, "one"));
721
722         context.setReplicatedLog(log);
723
724         // Send the last entry again.
725         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
726
727         follower = createBehavior(context);
728
729         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
730
731         assertEquals("Next index", 2, log.last().getIndex() + 1);
732         assertEquals("Entry 1", entries.get(0), log.get(1));
733
734         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
735
736         // Send the last entry again and also a new one.
737
738         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
739
740         leaderActor.underlyingActor().clear();
741         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
742
743         assertEquals("Next index", 3, log.last().getIndex() + 1);
744         assertEquals("Entry 1", entries.get(0), log.get(1));
745         assertEquals("Entry 2", entries.get(1), log.get(2));
746
747         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
748     }
749
750     @Test
751     public void testHandleAppendEntriesAfterInstallingSnapshot() {
752         logStart("testHandleAppendAfterInstallingSnapshot");
753
754         MockRaftActorContext context = createActorContext();
755
756         // Prepare the receivers log
757         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
758
759         // Set up a log as if it has been snapshotted
760         log.setSnapshotIndex(3);
761         log.setSnapshotTerm(1);
762
763         context.setReplicatedLog(log);
764
765         // Prepare the entries to be sent with AppendEntries
766         List<ReplicatedLogEntry> entries = new ArrayList<>();
767         entries.add(newReplicatedLogEntry(1, 4, "four"));
768
769         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
770
771         follower = createBehavior(context);
772
773         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
774
775         Assert.assertSame(follower, newBehavior);
776
777         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
778     }
779
780
781     /**
782      * This test verifies that when InstallSnapshot is received by
783      * the follower its applied correctly.
784      */
785     @Test
786     public void testHandleInstallSnapshot() throws Exception {
787         logStart("testHandleInstallSnapshot");
788
789         MockRaftActorContext context = createActorContext();
790         context.getTermInformation().update(1, "leader");
791
792         follower = createBehavior(context);
793
794         ByteString bsSnapshot  = createSnapshot();
795         int offset = 0;
796         int snapshotLength = bsSnapshot.size();
797         int chunkSize = 50;
798         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
799         int lastIncludedIndex = 1;
800         int chunkIndex = 1;
801         InstallSnapshot lastInstallSnapshot = null;
802
803         for (int i = 0; i < totalChunks; i++) {
804             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
805             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
806                     chunkData, chunkIndex, totalChunks);
807             follower.handleMessage(leaderActor, lastInstallSnapshot);
808             offset = offset + 50;
809             lastIncludedIndex++;
810             chunkIndex++;
811         }
812
813         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
814                 ApplySnapshot.class);
815         Snapshot snapshot = applySnapshot.getSnapshot();
816         assertNotNull(lastInstallSnapshot);
817         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
818         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
819                 snapshot.getLastAppliedTerm());
820         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
821                 snapshot.getLastAppliedIndex());
822         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
823         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
824         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
825         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
826         applySnapshot.getCallback().onSuccess();
827
828         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
829                 leaderActor, InstallSnapshotReply.class);
830         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
831
832         chunkIndex = 1;
833         for (InstallSnapshotReply reply: replies) {
834             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
835             assertEquals("getTerm", 1, reply.getTerm());
836             assertEquals("isSuccess", true, reply.isSuccess());
837             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
838         }
839
840         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
841     }
842
843
844     /**
845      * Verify that when an AppendEntries is sent to a follower during a snapshot install
846      * the Follower short-circuits the processing of the AppendEntries message.
847      */
848     @Test
849     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
850         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
851
852         MockRaftActorContext context = createActorContext();
853
854         follower = createBehavior(context);
855
856         ByteString bsSnapshot  = createSnapshot();
857         int snapshotLength = bsSnapshot.size();
858         int chunkSize = 50;
859         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
860         int lastIncludedIndex = 1;
861
862         // Check that snapshot installation is not in progress
863         assertNull(follower.getSnapshotTracker());
864
865         // Make sure that we have more than 1 chunk to send
866         assertTrue(totalChunks > 1);
867
868         // Send an install snapshot with the first chunk to start the process of installing a snapshot
869         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
870         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
871                 chunkData, 1, totalChunks));
872
873         // Check if snapshot installation is in progress now
874         assertNotNull(follower.getSnapshotTracker());
875
876         // Send an append entry
877         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
878                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
879
880         follower.handleMessage(leaderActor, appendEntries);
881
882         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
883         assertEquals("isSuccess", true, reply.isSuccess());
884         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
885         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
886         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
887
888         assertNotNull(follower.getSnapshotTracker());
889     }
890
891     @Test
892     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
893         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
894
895         MockRaftActorContext context = createActorContext();
896
897         follower = createBehavior(context);
898
899         ByteString bsSnapshot  = createSnapshot();
900         int snapshotLength = bsSnapshot.size();
901         int chunkSize = 50;
902         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
903         int lastIncludedIndex = 1;
904
905         // Check that snapshot installation is not in progress
906         assertNull(follower.getSnapshotTracker());
907
908         // Make sure that we have more than 1 chunk to send
909         assertTrue(totalChunks > 1);
910
911         // Send an install snapshot with the first chunk to start the process of installing a snapshot
912         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
913         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
914                 chunkData, 1, totalChunks));
915
916         // Check if snapshot installation is in progress now
917         assertNotNull(follower.getSnapshotTracker());
918
919         // Send appendEntries with a new term and leader.
920         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
921                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
922
923         follower.handleMessage(leaderActor, appendEntries);
924
925         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
926         assertEquals("isSuccess", true, reply.isSuccess());
927         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
928         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
929         assertEquals("getTerm", 2, reply.getTerm());
930
931         assertNull(follower.getSnapshotTracker());
932     }
933
934     @Test
935     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
936         logStart("testInitialSyncUpWithHandleInstallSnapshot");
937
938         MockRaftActorContext context = createActorContext();
939         context.setCommitIndex(-1);
940
941         follower = createBehavior(context);
942
943         ByteString bsSnapshot  = createSnapshot();
944         int offset = 0;
945         int snapshotLength = bsSnapshot.size();
946         int chunkSize = 50;
947         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
948         int lastIncludedIndex = 1;
949         int chunkIndex = 1;
950         InstallSnapshot lastInstallSnapshot = null;
951
952         for (int i = 0; i < totalChunks; i++) {
953             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
954             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
955                     chunkData, chunkIndex, totalChunks);
956             follower.handleMessage(leaderActor, lastInstallSnapshot);
957             offset = offset + 50;
958             lastIncludedIndex++;
959             chunkIndex++;
960         }
961
962         FollowerInitialSyncUpStatus syncStatus =
963                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
964
965         assertFalse(syncStatus.isInitialSyncDone());
966
967         // Clear all the messages
968         followerActor.underlyingActor().clear();
969
970         context.setLastApplied(101);
971         context.setCommitIndex(101);
972         setLastLogEntry(context, 1, 101,
973                 new MockRaftActorContext.MockPayload(""));
974
975         List<ReplicatedLogEntry> entries = Arrays.asList(
976                 newReplicatedLogEntry(2, 101, "foo"));
977
978         // The new commitIndex is 101
979         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
980         follower.handleMessage(leaderActor, appendEntries);
981
982         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
983
984         assertTrue(syncStatus.isInitialSyncDone());
985     }
986
987     @Test
988     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
989         logStart("testHandleOutOfSequenceInstallSnapshot");
990
991         MockRaftActorContext context = createActorContext();
992
993         follower = createBehavior(context);
994
995         ByteString bsSnapshot = createSnapshot();
996
997         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
998                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
999         follower.handleMessage(leaderActor, installSnapshot);
1000
1001         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1002                 InstallSnapshotReply.class);
1003
1004         assertEquals("isSuccess", false, reply.isSuccess());
1005         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1006         assertEquals("getTerm", 1, reply.getTerm());
1007         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1008
1009         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1010     }
1011
1012     @Test
1013     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1014         MockRaftActorContext context = createActorContext();
1015
1016         Stopwatch stopwatch = Stopwatch.createStarted();
1017
1018         follower = createBehavior(context);
1019
1020         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1021
1022         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1023
1024         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1025
1026         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1027         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1028     }
1029
1030     @Test
1031     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1032         MockRaftActorContext context = createActorContext();
1033         context.setConfigParams(new DefaultConfigParamsImpl() {
1034             @Override
1035             public FiniteDuration getElectionTimeOutInterval() {
1036                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1037             }
1038         });
1039
1040         context.setRaftPolicy(createRaftPolicy(false, false));
1041
1042         follower = createBehavior(context);
1043
1044         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1045         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1046         assertSame("handleMessage result", follower, newBehavior);
1047     }
1048
1049     @Test
1050     public void testFollowerSchedulesElectionIfNonVoting() {
1051         MockRaftActorContext context = createActorContext();
1052         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1053         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1054                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1055         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1056
1057         follower = new Follower(context, "leader", (short)1);
1058
1059         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1060                 ElectionTimeout.class);
1061         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1062         assertSame("handleMessage result", follower, newBehavior);
1063         assertNull("Expected null leaderId", follower.getLeaderId());
1064     }
1065
1066     @Test
1067     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1068         MockRaftActorContext context = createActorContext();
1069         follower = createBehavior(context);
1070         follower.handleMessage(leaderActor, new RaftRPC() {
1071             private static final long serialVersionUID = 1L;
1072
1073             @Override
1074             public long getTerm() {
1075                 return 100;
1076             }
1077         });
1078         verify(follower).scheduleElection(any(FiniteDuration.class));
1079     }
1080
1081     @Test
1082     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1083         MockRaftActorContext context = createActorContext();
1084         follower = createBehavior(context);
1085         follower.handleMessage(leaderActor, "non-raft-rpc");
1086         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1087     }
1088
1089     public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
1090         int snapshotLength = bs.size();
1091         int start = offset;
1092         int size = chunkSize;
1093         if (chunkSize > snapshotLength) {
1094             size = snapshotLength;
1095         } else {
1096             if (start + chunkSize > snapshotLength) {
1097                 size = snapshotLength - start;
1098             }
1099         }
1100
1101         byte[] nextChunk = new byte[size];
1102         bs.copyTo(nextChunk, start, 0, size);
1103         return nextChunk;
1104     }
1105
1106     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1107             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1108         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1109     }
1110
1111     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1112                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1113                                                    boolean expForceInstallSnapshot) {
1114
1115         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1116                 AppendEntriesReply.class);
1117
1118         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1119         assertEquals("getTerm", expTerm, reply.getTerm());
1120         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1121         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1122         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1123         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1124         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1125     }
1126
1127
1128     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1129         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1130                 new MockRaftActorContext.MockPayload(data));
1131     }
1132
1133     private ByteString createSnapshot() {
1134         HashMap<String, String> followerSnapshot = new HashMap<>();
1135         followerSnapshot.put("1", "A");
1136         followerSnapshot.put("2", "B");
1137         followerSnapshot.put("3", "C");
1138
1139         return toByteString(followerSnapshot);
1140     }
1141
1142     @Override
1143     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1144             ActorRef actorRef, RaftRPC rpc) throws Exception {
1145         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1146
1147         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1148         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1149     }
1150
1151     @Override
1152     protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1153             throws Exception {
1154         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1155         assertEquals("isSuccess", true, reply.isSuccess());
1156     }
1157 }