Fix issue when AE leader differs from prior install snapshot leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.Props;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.google.protobuf.ByteString;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Assert;
35 import org.junit.Test;
36 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
37 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
38 import org.opendaylight.controller.cluster.raft.RaftActorContext;
39 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.Snapshot;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
43 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
44 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
45 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
47 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
49 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
50 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
52 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
53 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
54 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
55 import scala.concurrent.duration.FiniteDuration;
56
57 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
58
59     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
60             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
61
62     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
63             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
64
65     private Follower follower;
66
67     private final short payloadVersion = 5;
68
69     @Override
70     @After
71     public void tearDown() throws Exception {
72         if(follower != null) {
73             follower.close();
74         }
75
76         super.tearDown();
77     }
78
79     @Override
80     protected Follower createBehavior(RaftActorContext actorContext) {
81         return spy(new Follower(actorContext));
82     }
83
84     @Override
85     protected  MockRaftActorContext createActorContext() {
86         return createActorContext(followerActor);
87     }
88
89     @Override
90     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
91         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
92         context.setPayloadVersion(payloadVersion );
93         return context;
94     }
95
96     @Test
97     public void testThatAnElectionTimeoutIsTriggered(){
98         MockRaftActorContext actorContext = createActorContext();
99         follower = new Follower(actorContext);
100
101         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
102                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
103     }
104
105     @Test
106     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
107         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
108
109         MockRaftActorContext context = createActorContext();
110         follower = new Follower(context);
111
112         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
113                 TimeUnit.MILLISECONDS);
114         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
115
116         assertTrue(raftBehavior instanceof Candidate);
117     }
118
119     @Test
120     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
121         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
122
123         MockRaftActorContext context = createActorContext();
124         ((DefaultConfigParamsImpl) context.getConfigParams()).
125                 setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
126         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
127
128         follower = new Follower(context);
129         context.setCurrentBehavior(follower);
130
131         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
132                 getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
133         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
134                 -1, -1, (short) 1));
135
136         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
137         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
138         assertTrue(raftBehavior instanceof Follower);
139
140         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
141                 getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
142         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
143                 -1, -1, (short) 1));
144
145         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
146         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
147         assertTrue(raftBehavior instanceof Follower);
148     }
149
150     @Test
151     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
152         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
153
154         MockRaftActorContext context = createActorContext();
155         long term = 1000;
156         context.getTermInformation().update(term, null);
157
158         follower = createBehavior(context);
159
160         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
161
162         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
163
164         assertEquals("isVoteGranted", true, reply.isVoteGranted());
165         assertEquals("getTerm", term, reply.getTerm());
166         verify(follower).scheduleElection(any(FiniteDuration.class));
167     }
168
169     @Test
170     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
171         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
172
173         MockRaftActorContext context = createActorContext();
174         long term = 1000;
175         context.getTermInformation().update(term, "test");
176
177         follower = createBehavior(context);
178
179         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
180
181         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
182
183         assertEquals("isVoteGranted", false, reply.isVoteGranted());
184         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
185     }
186
187
188     @Test
189     public void testHandleFirstAppendEntries() throws Exception {
190         logStart("testHandleFirstAppendEntries");
191
192         MockRaftActorContext context = createActorContext();
193         context.getReplicatedLog().clear(0,2);
194         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
195         context.getReplicatedLog().setSnapshotIndex(99);
196
197         List<ReplicatedLogEntry> entries = Arrays.asList(
198                 newReplicatedLogEntry(2, 101, "foo"));
199
200         Assert.assertEquals(1, context.getReplicatedLog().size());
201
202         // The new commitIndex is 101
203         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
204
205         follower = createBehavior(context);
206         follower.handleMessage(leaderActor, appendEntries);
207
208         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
209         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
210
211         assertFalse(syncStatus.isInitialSyncDone());
212         assertTrue("append entries reply should be true", reply.isSuccess());
213     }
214
215     @Test
216     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
217         logStart("testHandleFirstAppendEntries");
218
219         MockRaftActorContext context = createActorContext();
220
221         List<ReplicatedLogEntry> entries = Arrays.asList(
222                 newReplicatedLogEntry(2, 101, "foo"));
223
224         // The new commitIndex is 101
225         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
226
227         follower = createBehavior(context);
228         follower.handleMessage(leaderActor, appendEntries);
229
230         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
231         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
232
233         assertFalse(syncStatus.isInitialSyncDone());
234         assertFalse("append entries reply should be false", reply.isSuccess());
235     }
236
237     @Test
238     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
239         logStart("testHandleFirstAppendEntries");
240
241         MockRaftActorContext context = createActorContext();
242         context.getReplicatedLog().clear(0,2);
243         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
244         context.getReplicatedLog().setSnapshotIndex(99);
245
246         List<ReplicatedLogEntry> entries = Arrays.asList(
247                 newReplicatedLogEntry(2, 101, "foo"));
248
249         // The new commitIndex is 101
250         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
251
252         follower = createBehavior(context);
253         follower.handleMessage(leaderActor, appendEntries);
254
255         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
256         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
257
258         assertFalse(syncStatus.isInitialSyncDone());
259         assertTrue("append entries reply should be true", reply.isSuccess());
260     }
261
262     @Test
263     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
264         logStart("testHandleFirstAppendEntries");
265
266         MockRaftActorContext context = createActorContext();
267         context.getReplicatedLog().clear(0,2);
268         context.getReplicatedLog().setSnapshotIndex(100);
269
270         List<ReplicatedLogEntry> entries = Arrays.asList(
271                 newReplicatedLogEntry(2, 101, "foo"));
272
273         // The new commitIndex is 101
274         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
275
276         follower = createBehavior(context);
277         follower.handleMessage(leaderActor, appendEntries);
278
279         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
280         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
281
282         assertFalse(syncStatus.isInitialSyncDone());
283         assertTrue("append entries reply should be true", reply.isSuccess());
284     }
285
286     @Test
287     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
288         logStart("testHandleFirstAppendEntries");
289
290         MockRaftActorContext context = createActorContext();
291         context.getReplicatedLog().clear(0,2);
292         context.getReplicatedLog().setSnapshotIndex(100);
293
294         List<ReplicatedLogEntry> entries = Arrays.asList(
295                 newReplicatedLogEntry(2, 105, "foo"));
296
297         // The new commitIndex is 101
298         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
299
300         follower = createBehavior(context);
301         follower.handleMessage(leaderActor, appendEntries);
302
303         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
304         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
305
306         assertFalse(syncStatus.isInitialSyncDone());
307         assertFalse("append entries reply should be false", reply.isSuccess());
308     }
309
310     @Test
311     public void testHandleSyncUpAppendEntries() throws Exception {
312         logStart("testHandleSyncUpAppendEntries");
313
314         MockRaftActorContext context = createActorContext();
315
316         List<ReplicatedLogEntry> entries = Arrays.asList(
317                 newReplicatedLogEntry(2, 101, "foo"));
318
319         // The new commitIndex is 101
320         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
321
322         follower = createBehavior(context);
323         follower.handleMessage(leaderActor, appendEntries);
324
325         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
326
327         assertFalse(syncStatus.isInitialSyncDone());
328
329         // Clear all the messages
330         followerActor.underlyingActor().clear();
331
332         context.setLastApplied(101);
333         context.setCommitIndex(101);
334         setLastLogEntry(context, 1, 101,
335                 new MockRaftActorContext.MockPayload(""));
336
337         entries = Arrays.asList(
338                 newReplicatedLogEntry(2, 101, "foo"));
339
340         // The new commitIndex is 101
341         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
342         follower.handleMessage(leaderActor, appendEntries);
343
344         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
345
346         assertTrue(syncStatus.isInitialSyncDone());
347
348         followerActor.underlyingActor().clear();
349
350         // Sending the same message again should not generate another message
351
352         follower.handleMessage(leaderActor, appendEntries);
353
354         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
355
356         assertNull(syncStatus);
357
358     }
359
360     @Test
361     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
362         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
363
364         MockRaftActorContext context = createActorContext();
365
366         List<ReplicatedLogEntry> entries = Arrays.asList(
367                 newReplicatedLogEntry(2, 101, "foo"));
368
369         // The new commitIndex is 101
370         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
371
372         follower = createBehavior(context);
373         follower.handleMessage(leaderActor, appendEntries);
374
375         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
376
377         assertFalse(syncStatus.isInitialSyncDone());
378
379         // Clear all the messages
380         followerActor.underlyingActor().clear();
381
382         context.setLastApplied(100);
383         setLastLogEntry(context, 1, 100,
384                 new MockRaftActorContext.MockPayload(""));
385
386         entries = Arrays.asList(
387                 newReplicatedLogEntry(2, 101, "foo"));
388
389         // leader-2 is becoming the leader now and it says the commitIndex is 45
390         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
391         follower.handleMessage(leaderActor, appendEntries);
392
393         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
394
395         // We get a new message saying initial status is not done
396         assertFalse(syncStatus.isInitialSyncDone());
397
398     }
399
400
401     @Test
402     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
403         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
404
405         MockRaftActorContext context = createActorContext();
406
407         List<ReplicatedLogEntry> entries = Arrays.asList(
408                 newReplicatedLogEntry(2, 101, "foo"));
409
410         // The new commitIndex is 101
411         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
412
413         follower = createBehavior(context);
414         follower.handleMessage(leaderActor, appendEntries);
415
416         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
417
418         assertFalse(syncStatus.isInitialSyncDone());
419
420         // Clear all the messages
421         followerActor.underlyingActor().clear();
422
423         context.setLastApplied(101);
424         context.setCommitIndex(101);
425         setLastLogEntry(context, 1, 101,
426                 new MockRaftActorContext.MockPayload(""));
427
428         entries = Arrays.asList(
429                 newReplicatedLogEntry(2, 101, "foo"));
430
431         // The new commitIndex is 101
432         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
433         follower.handleMessage(leaderActor, appendEntries);
434
435         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
436
437         assertTrue(syncStatus.isInitialSyncDone());
438
439         // Clear all the messages
440         followerActor.underlyingActor().clear();
441
442         context.setLastApplied(100);
443         setLastLogEntry(context, 1, 100,
444                 new MockRaftActorContext.MockPayload(""));
445
446         entries = Arrays.asList(
447                 newReplicatedLogEntry(2, 101, "foo"));
448
449         // leader-2 is becoming the leader now and it says the commitIndex is 45
450         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
451         follower.handleMessage(leaderActor, appendEntries);
452
453         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
454
455         // We get a new message saying initial status is not done
456         assertFalse(syncStatus.isInitialSyncDone());
457
458     }
459
460
461     /**
462      * This test verifies that when an AppendEntries RPC is received by a RaftActor
463      * with a commitIndex that is greater than what has been applied to the
464      * state machine of the RaftActor, the RaftActor applies the state and
465      * sets it current applied state to the commitIndex of the sender.
466      *
467      * @throws Exception
468      */
469     @Test
470     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
471         logStart("testHandleAppendEntriesWithNewerCommitIndex");
472
473         MockRaftActorContext context = createActorContext();
474
475         context.setLastApplied(100);
476         setLastLogEntry(context, 1, 100,
477                 new MockRaftActorContext.MockPayload(""));
478         context.getReplicatedLog().setSnapshotIndex(99);
479
480         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
481                 newReplicatedLogEntry(2, 101, "foo"));
482
483         // The new commitIndex is 101
484         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
485
486         follower = createBehavior(context);
487         follower.handleMessage(leaderActor, appendEntries);
488
489         assertEquals("getLastApplied", 101L, context.getLastApplied());
490     }
491
492     /**
493      * This test verifies that when an AppendEntries is received a specific prevLogTerm
494      * which does not match the term that is in RaftActors log entry at prevLogIndex
495      * then the RaftActor does not change it's state and it returns a failure.
496      *
497      * @throws Exception
498      */
499     @Test
500     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
501         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
502
503         MockRaftActorContext context = createActorContext();
504
505         // First set the receivers term to lower number
506         context.getTermInformation().update(95, "test");
507
508         // AppendEntries is now sent with a bigger term
509         // this will set the receivers term to be the same as the sender's term
510         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
511
512         follower = createBehavior(context);
513
514         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
515
516         Assert.assertSame(follower, newBehavior);
517
518         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
519                 AppendEntriesReply.class);
520
521         assertEquals("isSuccess", false, reply.isSuccess());
522     }
523
524     /**
525      * This test verifies that when a new AppendEntries message is received with
526      * new entries and the logs of the sender and receiver match that the new
527      * entries get added to the log and the log is incremented by the number of
528      * entries received in appendEntries
529      *
530      * @throws Exception
531      */
532     @Test
533     public void testHandleAppendEntriesAddNewEntries() {
534         logStart("testHandleAppendEntriesAddNewEntries");
535
536         MockRaftActorContext context = createActorContext();
537
538         // First set the receivers term to lower number
539         context.getTermInformation().update(1, "test");
540
541         // Prepare the receivers log
542         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
543         log.append(newReplicatedLogEntry(1, 0, "zero"));
544         log.append(newReplicatedLogEntry(1, 1, "one"));
545         log.append(newReplicatedLogEntry(1, 2, "two"));
546
547         context.setReplicatedLog(log);
548
549         // Prepare the entries to be sent with AppendEntries
550         List<ReplicatedLogEntry> entries = new ArrayList<>();
551         entries.add(newReplicatedLogEntry(1, 3, "three"));
552         entries.add(newReplicatedLogEntry(1, 4, "four"));
553
554         // Send appendEntries with the same term as was set on the receiver
555         // before the new behavior was created (1 in this case)
556         // This will not work for a Candidate because as soon as a Candidate
557         // is created it increments the term
558         short leaderPayloadVersion = 10;
559         String leaderId = "leader-1";
560         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
561
562         follower = createBehavior(context);
563
564         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
565
566         Assert.assertSame(follower, newBehavior);
567
568         assertEquals("Next index", 5, log.last().getIndex() + 1);
569         assertEquals("Entry 3", entries.get(0), log.get(3));
570         assertEquals("Entry 4", entries.get(1), log.get(4));
571
572         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
573         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
574
575         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
576     }
577
578     /**
579      * This test verifies that when a new AppendEntries message is received with
580      * new entries and the logs of the sender and receiver are out-of-sync that
581      * the log is first corrected by removing the out of sync entries from the
582      * log and then adding in the new entries sent with the AppendEntries message
583      */
584     @Test
585     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
586         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
587
588         MockRaftActorContext context = createActorContext();
589
590         // First set the receivers term to lower number
591         context.getTermInformation().update(1, "test");
592
593         // Prepare the receivers log
594         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
595         log.append(newReplicatedLogEntry(1, 0, "zero"));
596         log.append(newReplicatedLogEntry(1, 1, "one"));
597         log.append(newReplicatedLogEntry(1, 2, "two"));
598
599         context.setReplicatedLog(log);
600
601         // Prepare the entries to be sent with AppendEntries
602         List<ReplicatedLogEntry> entries = new ArrayList<>();
603         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
604         entries.add(newReplicatedLogEntry(2, 3, "three"));
605
606         // Send appendEntries with the same term as was set on the receiver
607         // before the new behavior was created (1 in this case)
608         // This will not work for a Candidate because as soon as a Candidate
609         // is created it increments the term
610         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
611
612         follower = createBehavior(context);
613
614         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
615
616         Assert.assertSame(follower, newBehavior);
617
618         // The entry at index 2 will be found out-of-sync with the leader
619         // and will be removed
620         // Then the two new entries will be added to the log
621         // Thus making the log to have 4 entries
622         assertEquals("Next index", 4, log.last().getIndex() + 1);
623         //assertEquals("Entry 2", entries.get(0), log.get(2));
624
625         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
626
627         // Check that the entry at index 2 has the new data
628         assertEquals("Entry 2", entries.get(0), log.get(2));
629
630         assertEquals("Entry 3", entries.get(1), log.get(3));
631
632         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
633     }
634
635     @Test
636     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
637         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
638
639         MockRaftActorContext context = createActorContext();
640
641         // First set the receivers term to lower number
642         context.getTermInformation().update(1, "test");
643
644         // Prepare the receivers log
645         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
646         log.append(newReplicatedLogEntry(1, 0, "zero"));
647         log.append(newReplicatedLogEntry(1, 1, "one"));
648         log.append(newReplicatedLogEntry(1, 2, "two"));
649
650         context.setReplicatedLog(log);
651
652         // Prepare the entries to be sent with AppendEntries
653         List<ReplicatedLogEntry> entries = new ArrayList<>();
654         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
655         entries.add(newReplicatedLogEntry(2, 3, "three"));
656
657         // Send appendEntries with the same term as was set on the receiver
658         // before the new behavior was created (1 in this case)
659         // This will not work for a Candidate because as soon as a Candidate
660         // is created it increments the term
661         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
662
663         context.setRaftPolicy(createRaftPolicy(false, true));
664         follower = createBehavior(context);
665
666         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
667
668         Assert.assertSame(follower, newBehavior);
669
670         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
671     }
672
673     @Test
674     public void testHandleAppendEntriesPreviousLogEntryMissing(){
675         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
676
677         MockRaftActorContext context = createActorContext();
678
679         // Prepare the receivers log
680         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
681         log.append(newReplicatedLogEntry(1, 0, "zero"));
682         log.append(newReplicatedLogEntry(1, 1, "one"));
683         log.append(newReplicatedLogEntry(1, 2, "two"));
684
685         context.setReplicatedLog(log);
686
687         // Prepare the entries to be sent with AppendEntries
688         List<ReplicatedLogEntry> entries = new ArrayList<>();
689         entries.add(newReplicatedLogEntry(1, 4, "four"));
690
691         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
692
693         follower = createBehavior(context);
694
695         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
696
697         Assert.assertSame(follower, newBehavior);
698
699         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
700     }
701
702     @Test
703     public void testHandleAppendEntriesWithExistingLogEntry() {
704         logStart("testHandleAppendEntriesWithExistingLogEntry");
705
706         MockRaftActorContext context = createActorContext();
707
708         context.getTermInformation().update(1, "test");
709
710         // Prepare the receivers log
711         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
712         log.append(newReplicatedLogEntry(1, 0, "zero"));
713         log.append(newReplicatedLogEntry(1, 1, "one"));
714
715         context.setReplicatedLog(log);
716
717         // Send the last entry again.
718         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
719
720         follower = createBehavior(context);
721
722         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
723
724         assertEquals("Next index", 2, log.last().getIndex() + 1);
725         assertEquals("Entry 1", entries.get(0), log.get(1));
726
727         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
728
729         // Send the last entry again and also a new one.
730
731         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
732
733         leaderActor.underlyingActor().clear();
734         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
735
736         assertEquals("Next index", 3, log.last().getIndex() + 1);
737         assertEquals("Entry 1", entries.get(0), log.get(1));
738         assertEquals("Entry 2", entries.get(1), log.get(2));
739
740         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
741     }
742
743     @Test
744     public void testHandleAppendEntriesAfterInstallingSnapshot(){
745         logStart("testHandleAppendAfterInstallingSnapshot");
746
747         MockRaftActorContext context = createActorContext();
748
749         // Prepare the receivers log
750         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
751
752         // Set up a log as if it has been snapshotted
753         log.setSnapshotIndex(3);
754         log.setSnapshotTerm(1);
755
756         context.setReplicatedLog(log);
757
758         // Prepare the entries to be sent with AppendEntries
759         List<ReplicatedLogEntry> entries = new ArrayList<>();
760         entries.add(newReplicatedLogEntry(1, 4, "four"));
761
762         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
763
764         follower = createBehavior(context);
765
766         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
767
768         Assert.assertSame(follower, newBehavior);
769
770         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
771     }
772
773
774     /**
775      * This test verifies that when InstallSnapshot is received by
776      * the follower its applied correctly.
777      *
778      * @throws Exception
779      */
780     @Test
781     public void testHandleInstallSnapshot() throws Exception {
782         logStart("testHandleInstallSnapshot");
783
784         MockRaftActorContext context = createActorContext();
785         context.getTermInformation().update(1, "leader");
786
787         follower = createBehavior(context);
788
789         ByteString bsSnapshot  = createSnapshot();
790         int offset = 0;
791         int snapshotLength = bsSnapshot.size();
792         int chunkSize = 50;
793         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
794         int lastIncludedIndex = 1;
795         int chunkIndex = 1;
796         InstallSnapshot lastInstallSnapshot = null;
797
798         for(int i = 0; i < totalChunks; i++) {
799             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
800             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
801                     chunkData, chunkIndex, totalChunks);
802             follower.handleMessage(leaderActor, lastInstallSnapshot);
803             offset = offset + 50;
804             lastIncludedIndex++;
805             chunkIndex++;
806         }
807
808         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
809                 ApplySnapshot.class);
810         Snapshot snapshot = applySnapshot.getSnapshot();
811         assertNotNull(lastInstallSnapshot);
812         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
813         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
814                 snapshot.getLastAppliedTerm());
815         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
816                 snapshot.getLastAppliedIndex());
817         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
818         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
819         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
820         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
821         applySnapshot.getCallback().onSuccess();
822
823         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
824                 leaderActor, InstallSnapshotReply.class);
825         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
826
827         chunkIndex = 1;
828         for(InstallSnapshotReply reply: replies) {
829             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
830             assertEquals("getTerm", 1, reply.getTerm());
831             assertEquals("isSuccess", true, reply.isSuccess());
832             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
833         }
834
835         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
836     }
837
838
839     /**
840      * Verify that when an AppendEntries is sent to a follower during a snapshot install
841      * the Follower short-circuits the processing of the AppendEntries message.
842      *
843      * @throws Exception
844      */
845     @Test
846     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
847         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
848
849         MockRaftActorContext context = createActorContext();
850
851         follower = createBehavior(context);
852
853         ByteString bsSnapshot  = createSnapshot();
854         int snapshotLength = bsSnapshot.size();
855         int chunkSize = 50;
856         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
857         int lastIncludedIndex = 1;
858
859         // Check that snapshot installation is not in progress
860         assertNull(follower.getSnapshotTracker());
861
862         // Make sure that we have more than 1 chunk to send
863         assertTrue(totalChunks > 1);
864
865         // Send an install snapshot with the first chunk to start the process of installing a snapshot
866         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
867         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
868                 chunkData, 1, totalChunks));
869
870         // Check if snapshot installation is in progress now
871         assertNotNull(follower.getSnapshotTracker());
872
873         // Send an append entry
874         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
875                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
876
877         follower.handleMessage(leaderActor, appendEntries);
878
879         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
880         assertEquals("isSuccess", true, reply.isSuccess());
881         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
882         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
883         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
884
885         assertNotNull(follower.getSnapshotTracker());
886     }
887
888     @Test
889     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
890         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
891
892         MockRaftActorContext context = createActorContext();
893
894         follower = createBehavior(context);
895
896         ByteString bsSnapshot  = createSnapshot();
897         int snapshotLength = bsSnapshot.size();
898         int chunkSize = 50;
899         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
900         int lastIncludedIndex = 1;
901
902         // Check that snapshot installation is not in progress
903         assertNull(follower.getSnapshotTracker());
904
905         // Make sure that we have more than 1 chunk to send
906         assertTrue(totalChunks > 1);
907
908         // Send an install snapshot with the first chunk to start the process of installing a snapshot
909         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
910         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
911                 chunkData, 1, totalChunks));
912
913         // Check if snapshot installation is in progress now
914         assertNotNull(follower.getSnapshotTracker());
915
916         // Send appendEntries with a new term and leader.
917         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
918                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
919
920         follower.handleMessage(leaderActor, appendEntries);
921
922         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
923         assertEquals("isSuccess", true, reply.isSuccess());
924         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
925         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
926         assertEquals("getTerm", 2, reply.getTerm());
927
928         assertNull(follower.getSnapshotTracker());
929     }
930
931     @Test
932     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
933         logStart("testInitialSyncUpWithHandleInstallSnapshot");
934
935         MockRaftActorContext context = createActorContext();
936         context.setCommitIndex(-1);
937
938         follower = createBehavior(context);
939
940         ByteString bsSnapshot  = createSnapshot();
941         int offset = 0;
942         int snapshotLength = bsSnapshot.size();
943         int chunkSize = 50;
944         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
945         int lastIncludedIndex = 1;
946         int chunkIndex = 1;
947         InstallSnapshot lastInstallSnapshot = null;
948
949         for(int i = 0; i < totalChunks; i++) {
950             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
951             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
952                     chunkData, chunkIndex, totalChunks);
953             follower.handleMessage(leaderActor, lastInstallSnapshot);
954             offset = offset + 50;
955             lastIncludedIndex++;
956             chunkIndex++;
957         }
958
959         FollowerInitialSyncUpStatus syncStatus =
960                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
961
962         assertFalse(syncStatus.isInitialSyncDone());
963
964         // Clear all the messages
965         followerActor.underlyingActor().clear();
966
967         context.setLastApplied(101);
968         context.setCommitIndex(101);
969         setLastLogEntry(context, 1, 101,
970                 new MockRaftActorContext.MockPayload(""));
971
972         List<ReplicatedLogEntry> entries = Arrays.asList(
973                 newReplicatedLogEntry(2, 101, "foo"));
974
975         // The new commitIndex is 101
976         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
977         follower.handleMessage(leaderActor, appendEntries);
978
979         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
980
981         assertTrue(syncStatus.isInitialSyncDone());
982     }
983
984     @Test
985     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
986         logStart("testHandleOutOfSequenceInstallSnapshot");
987
988         MockRaftActorContext context = createActorContext();
989
990         follower = createBehavior(context);
991
992         ByteString bsSnapshot = createSnapshot();
993
994         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
995                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
996         follower.handleMessage(leaderActor, installSnapshot);
997
998         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
999                 InstallSnapshotReply.class);
1000
1001         assertEquals("isSuccess", false, reply.isSuccess());
1002         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1003         assertEquals("getTerm", 1, reply.getTerm());
1004         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1005
1006         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1007     }
1008
1009     @Test
1010     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
1011         MockRaftActorContext context = createActorContext();
1012
1013         Stopwatch stopwatch = Stopwatch.createStarted();
1014
1015         follower = createBehavior(context);
1016
1017         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1018
1019         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1020
1021         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1022
1023         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1024         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1025     }
1026
1027     @Test
1028     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
1029         MockRaftActorContext context = createActorContext();
1030         context.setConfigParams(new DefaultConfigParamsImpl(){
1031             @Override
1032             public FiniteDuration getElectionTimeOutInterval() {
1033                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1034             }
1035         });
1036
1037         context.setRaftPolicy(createRaftPolicy(false, false));
1038
1039         follower = createBehavior(context);
1040
1041         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1042         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1043         assertSame("handleMessage result", follower, newBehavior);
1044     }
1045
1046     @Test
1047     public void testFollowerSchedulesElectionIfNonVoting(){
1048         MockRaftActorContext context = createActorContext();
1049         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1050         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1051                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1052         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1053
1054         follower = new Follower(context, "leader", (short)1);
1055
1056         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1057                 ElectionTimeout.class);
1058         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1059         assertSame("handleMessage result", follower, newBehavior);
1060         assertNull("Expected null leaderId", follower.getLeaderId());
1061     }
1062
1063     @Test
1064     public void testElectionScheduledWhenAnyRaftRPCReceived(){
1065         MockRaftActorContext context = createActorContext();
1066         follower = createBehavior(context);
1067         follower.handleMessage(leaderActor, new RaftRPC() {
1068             private static final long serialVersionUID = 1L;
1069
1070             @Override
1071             public long getTerm() {
1072                 return 100;
1073             }
1074         });
1075         verify(follower).scheduleElection(any(FiniteDuration.class));
1076     }
1077
1078     @Test
1079     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
1080         MockRaftActorContext context = createActorContext();
1081         follower = createBehavior(context);
1082         follower.handleMessage(leaderActor, "non-raft-rpc");
1083         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1084     }
1085
1086     public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
1087         int snapshotLength = bs.size();
1088         int start = offset;
1089         int size = chunkSize;
1090         if (chunkSize > snapshotLength) {
1091             size = snapshotLength;
1092         } else {
1093             if (start + chunkSize > snapshotLength) {
1094                 size = snapshotLength - start;
1095             }
1096         }
1097
1098         byte[] nextChunk = new byte[size];
1099         bs.copyTo(nextChunk, start, 0, size);
1100         return nextChunk;
1101     }
1102
1103     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1104             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1105         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1106     }
1107
1108     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1109                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1110                                                    boolean expForceInstallSnapshot) {
1111
1112         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1113                 AppendEntriesReply.class);
1114
1115         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1116         assertEquals("getTerm", expTerm, reply.getTerm());
1117         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1118         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1119         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1120         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1121         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1122     }
1123
1124
1125     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1126         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
1127                 new MockRaftActorContext.MockPayload(data));
1128     }
1129
1130     private ByteString createSnapshot(){
1131         HashMap<String, String> followerSnapshot = new HashMap<>();
1132         followerSnapshot.put("1", "A");
1133         followerSnapshot.put("2", "B");
1134         followerSnapshot.put("3", "C");
1135
1136         return toByteString(followerSnapshot);
1137     }
1138
1139     @Override
1140     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1141             ActorRef actorRef, RaftRPC rpc) throws Exception {
1142         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1143
1144         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1145         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1146     }
1147
1148     @Override
1149     protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1150             throws Exception {
1151         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1152         assertEquals("isSuccess", true, reply.isSuccess());
1153     }
1154 }