9d64b140fb7d9db2c191eb12ffc847deb5273561
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.testkit.TestActorRef;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.google.protobuf.ByteString;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
38 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
39 import org.opendaylight.controller.cluster.raft.RaftActorContext;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.cluster.raft.Snapshot;
42 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
44 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
45 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
48 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
49 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
50 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
51 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
52 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
53 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
54 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
55 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
56 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
57 import scala.concurrent.duration.FiniteDuration;
58
59 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
60
61     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
62             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
63
64     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
65             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
66
67     private Follower follower;
68
69     private final short payloadVersion = 5;
70
71     @Override
72     @After
73     public void tearDown() throws Exception {
74         if (follower != null) {
75             follower.close();
76         }
77
78         super.tearDown();
79     }
80
81     @Override
82     protected Follower createBehavior(RaftActorContext actorContext) {
83         return spy(new Follower(actorContext));
84     }
85
86     @Override
87     protected  MockRaftActorContext createActorContext() {
88         return createActorContext(followerActor);
89     }
90
91     @Override
92     protected  MockRaftActorContext createActorContext(ActorRef actorRef) {
93         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
94         context.setPayloadVersion(payloadVersion );
95         return context;
96     }
97
98     @Test
99     public void testThatAnElectionTimeoutIsTriggered() {
100         MockRaftActorContext actorContext = createActorContext();
101         follower = new Follower(actorContext);
102
103         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
104                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
105     }
106
107     @Test
108     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
109         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
110
111         MockRaftActorContext context = createActorContext();
112         follower = new Follower(context);
113
114         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
115                 TimeUnit.MILLISECONDS);
116         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
117
118         assertTrue(raftBehavior instanceof Candidate);
119     }
120
121     @Test
122     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
123         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
124
125         MockRaftActorContext context = createActorContext();
126         ((DefaultConfigParamsImpl) context.getConfigParams())
127                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
128         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
129
130         follower = new Follower(context);
131         context.setCurrentBehavior(follower);
132
133         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
134                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
135         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
136                 -1, -1, (short) 1));
137
138         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
139         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
140         assertTrue(raftBehavior instanceof Follower);
141
142         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
143                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
144         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
145                 -1, -1, (short) 1));
146
147         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
148         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
149         assertTrue(raftBehavior instanceof Follower);
150     }
151
152     @Test
153     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
154         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
155
156         MockRaftActorContext context = createActorContext();
157         long term = 1000;
158         context.getTermInformation().update(term, null);
159
160         follower = createBehavior(context);
161
162         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
163
164         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
165
166         assertEquals("isVoteGranted", true, reply.isVoteGranted());
167         assertEquals("getTerm", term, reply.getTerm());
168         verify(follower).scheduleElection(any(FiniteDuration.class));
169     }
170
171     @Test
172     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
173         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
174
175         MockRaftActorContext context = createActorContext();
176         long term = 1000;
177         context.getTermInformation().update(term, "test");
178
179         follower = createBehavior(context);
180
181         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
182
183         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
184
185         assertEquals("isVoteGranted", false, reply.isVoteGranted());
186         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
187     }
188
189
190     @Test
191     public void testHandleFirstAppendEntries() throws Exception {
192         logStart("testHandleFirstAppendEntries");
193
194         MockRaftActorContext context = createActorContext();
195         context.getReplicatedLog().clear(0,2);
196         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
197         context.getReplicatedLog().setSnapshotIndex(99);
198
199         List<ReplicatedLogEntry> entries = Arrays.asList(
200                 newReplicatedLogEntry(2, 101, "foo"));
201
202         Assert.assertEquals(1, context.getReplicatedLog().size());
203
204         // The new commitIndex is 101
205         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
206
207         follower = createBehavior(context);
208         follower.handleMessage(leaderActor, appendEntries);
209
210         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
211                 FollowerInitialSyncUpStatus.class);
212         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
213
214         assertFalse(syncStatus.isInitialSyncDone());
215         assertTrue("append entries reply should be true", reply.isSuccess());
216     }
217
218     @Test
219     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
220         logStart("testHandleFirstAppendEntries");
221
222         MockRaftActorContext context = createActorContext();
223
224         List<ReplicatedLogEntry> entries = Arrays.asList(
225                 newReplicatedLogEntry(2, 101, "foo"));
226
227         // The new commitIndex is 101
228         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
229
230         follower = createBehavior(context);
231         follower.handleMessage(leaderActor, appendEntries);
232
233         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
234                 FollowerInitialSyncUpStatus.class);
235         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
236
237         assertFalse(syncStatus.isInitialSyncDone());
238         assertFalse("append entries reply should be false", reply.isSuccess());
239     }
240
241     @Test
242     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
243             throws Exception {
244         logStart("testHandleFirstAppendEntries");
245
246         MockRaftActorContext context = createActorContext();
247         context.getReplicatedLog().clear(0,2);
248         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
249         context.getReplicatedLog().setSnapshotIndex(99);
250
251         List<ReplicatedLogEntry> entries = Arrays.asList(
252                 newReplicatedLogEntry(2, 101, "foo"));
253
254         // The new commitIndex is 101
255         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
256
257         follower = createBehavior(context);
258         follower.handleMessage(leaderActor, appendEntries);
259
260         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
261                 FollowerInitialSyncUpStatus.class);
262         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
263
264         assertFalse(syncStatus.isInitialSyncDone());
265         assertTrue("append entries reply should be true", reply.isSuccess());
266     }
267
268     @Test
269     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
270             throws Exception {
271         logStart("testHandleFirstAppendEntries");
272
273         MockRaftActorContext context = createActorContext();
274         context.getReplicatedLog().clear(0,2);
275         context.getReplicatedLog().setSnapshotIndex(100);
276
277         List<ReplicatedLogEntry> entries = Arrays.asList(
278                 newReplicatedLogEntry(2, 101, "foo"));
279
280         // The new commitIndex is 101
281         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
282
283         follower = createBehavior(context);
284         follower.handleMessage(leaderActor, appendEntries);
285
286         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
287                 FollowerInitialSyncUpStatus.class);
288         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
289
290         assertFalse(syncStatus.isInitialSyncDone());
291         assertTrue("append entries reply should be true", reply.isSuccess());
292     }
293
294     @Test
295     public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
296             throws Exception {
297         logStart(
298                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
299
300         MockRaftActorContext context = createActorContext();
301         context.getReplicatedLog().clear(0,2);
302         context.getReplicatedLog().setSnapshotIndex(100);
303
304         List<ReplicatedLogEntry> entries = Arrays.asList(
305                 newReplicatedLogEntry(2, 105, "foo"));
306
307         // The new commitIndex is 101
308         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
309
310         follower = createBehavior(context);
311         follower.handleMessage(leaderActor, appendEntries);
312
313         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
314                 FollowerInitialSyncUpStatus.class);
315         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
316
317         assertFalse(syncStatus.isInitialSyncDone());
318         assertFalse("append entries reply should be false", reply.isSuccess());
319     }
320
321     @Test
322     public void testHandleSyncUpAppendEntries() throws Exception {
323         logStart("testHandleSyncUpAppendEntries");
324
325         MockRaftActorContext context = createActorContext();
326
327         List<ReplicatedLogEntry> entries = Arrays.asList(
328                 newReplicatedLogEntry(2, 101, "foo"));
329
330         // The new commitIndex is 101
331         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
332
333         follower = createBehavior(context);
334         follower.handleMessage(leaderActor, appendEntries);
335
336         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
337                 FollowerInitialSyncUpStatus.class);
338
339         assertFalse(syncStatus.isInitialSyncDone());
340
341         // Clear all the messages
342         followerActor.underlyingActor().clear();
343
344         context.setLastApplied(101);
345         context.setCommitIndex(101);
346         setLastLogEntry(context, 1, 101,
347                 new MockRaftActorContext.MockPayload(""));
348
349         entries = Arrays.asList(
350                 newReplicatedLogEntry(2, 101, "foo"));
351
352         // The new commitIndex is 101
353         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
354         follower.handleMessage(leaderActor, appendEntries);
355
356         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
357
358         assertTrue(syncStatus.isInitialSyncDone());
359
360         followerActor.underlyingActor().clear();
361
362         // Sending the same message again should not generate another message
363
364         follower.handleMessage(leaderActor, appendEntries);
365
366         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
367
368         assertNull(syncStatus);
369
370     }
371
372     @Test
373     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
374         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
375
376         MockRaftActorContext context = createActorContext();
377
378         List<ReplicatedLogEntry> entries = Arrays.asList(
379                 newReplicatedLogEntry(2, 101, "foo"));
380
381         // The new commitIndex is 101
382         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
383
384         follower = createBehavior(context);
385         follower.handleMessage(leaderActor, appendEntries);
386
387         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
388                 FollowerInitialSyncUpStatus.class);
389
390         assertFalse(syncStatus.isInitialSyncDone());
391
392         // Clear all the messages
393         followerActor.underlyingActor().clear();
394
395         context.setLastApplied(100);
396         setLastLogEntry(context, 1, 100,
397                 new MockRaftActorContext.MockPayload(""));
398
399         entries = Arrays.asList(
400                 newReplicatedLogEntry(2, 101, "foo"));
401
402         // leader-2 is becoming the leader now and it says the commitIndex is 45
403         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
404         follower.handleMessage(leaderActor, appendEntries);
405
406         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
407
408         // We get a new message saying initial status is not done
409         assertFalse(syncStatus.isInitialSyncDone());
410
411     }
412
413
414     @Test
415     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
416         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
417
418         MockRaftActorContext context = createActorContext();
419
420         List<ReplicatedLogEntry> entries = Arrays.asList(
421                 newReplicatedLogEntry(2, 101, "foo"));
422
423         // The new commitIndex is 101
424         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
425
426         follower = createBehavior(context);
427         follower.handleMessage(leaderActor, appendEntries);
428
429         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
430                 FollowerInitialSyncUpStatus.class);
431
432         assertFalse(syncStatus.isInitialSyncDone());
433
434         // Clear all the messages
435         followerActor.underlyingActor().clear();
436
437         context.setLastApplied(101);
438         context.setCommitIndex(101);
439         setLastLogEntry(context, 1, 101,
440                 new MockRaftActorContext.MockPayload(""));
441
442         entries = Arrays.asList(
443                 newReplicatedLogEntry(2, 101, "foo"));
444
445         // The new commitIndex is 101
446         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
447         follower.handleMessage(leaderActor, appendEntries);
448
449         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
450
451         assertTrue(syncStatus.isInitialSyncDone());
452
453         // Clear all the messages
454         followerActor.underlyingActor().clear();
455
456         context.setLastApplied(100);
457         setLastLogEntry(context, 1, 100,
458                 new MockRaftActorContext.MockPayload(""));
459
460         entries = Arrays.asList(
461                 newReplicatedLogEntry(2, 101, "foo"));
462
463         // leader-2 is becoming the leader now and it says the commitIndex is 45
464         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
465         follower.handleMessage(leaderActor, appendEntries);
466
467         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
468
469         // We get a new message saying initial status is not done
470         assertFalse(syncStatus.isInitialSyncDone());
471
472     }
473
474
475     /**
476      * This test verifies that when an AppendEntries RPC is received by a RaftActor
477      * with a commitIndex that is greater than what has been applied to the
478      * state machine of the RaftActor, the RaftActor applies the state and
479      * sets it current applied state to the commitIndex of the sender.
480      */
481     @Test
482     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
483         logStart("testHandleAppendEntriesWithNewerCommitIndex");
484
485         MockRaftActorContext context = createActorContext();
486
487         context.setLastApplied(100);
488         setLastLogEntry(context, 1, 100,
489                 new MockRaftActorContext.MockPayload(""));
490         context.getReplicatedLog().setSnapshotIndex(99);
491
492         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
493                 newReplicatedLogEntry(2, 101, "foo"));
494
495         // The new commitIndex is 101
496         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
497
498         follower = createBehavior(context);
499         follower.handleMessage(leaderActor, appendEntries);
500
501         assertEquals("getLastApplied", 101L, context.getLastApplied());
502     }
503
504     /**
505      * This test verifies that when an AppendEntries is received a specific prevLogTerm
506      * which does not match the term that is in RaftActors log entry at prevLogIndex
507      * then the RaftActor does not change it's state and it returns a failure.
508      */
509     @Test
510     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
511         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
512
513         MockRaftActorContext context = createActorContext();
514
515         // First set the receivers term to lower number
516         context.getTermInformation().update(95, "test");
517
518         // AppendEntries is now sent with a bigger term
519         // this will set the receivers term to be the same as the sender's term
520         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
521
522         follower = createBehavior(context);
523
524         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
525
526         Assert.assertSame(follower, newBehavior);
527
528         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
529                 AppendEntriesReply.class);
530
531         assertEquals("isSuccess", false, reply.isSuccess());
532     }
533
534     /**
535      * This test verifies that when a new AppendEntries message is received with
536      * new entries and the logs of the sender and receiver match that the new
537      * entries get added to the log and the log is incremented by the number of
538      * entries received in appendEntries.
539      */
540     @Test
541     public void testHandleAppendEntriesAddNewEntries() {
542         logStart("testHandleAppendEntriesAddNewEntries");
543
544         MockRaftActorContext context = createActorContext();
545
546         // First set the receivers term to lower number
547         context.getTermInformation().update(1, "test");
548
549         // Prepare the receivers log
550         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
551         log.append(newReplicatedLogEntry(1, 0, "zero"));
552         log.append(newReplicatedLogEntry(1, 1, "one"));
553         log.append(newReplicatedLogEntry(1, 2, "two"));
554
555         context.setReplicatedLog(log);
556
557         // Prepare the entries to be sent with AppendEntries
558         List<ReplicatedLogEntry> entries = new ArrayList<>();
559         entries.add(newReplicatedLogEntry(1, 3, "three"));
560         entries.add(newReplicatedLogEntry(1, 4, "four"));
561
562         // Send appendEntries with the same term as was set on the receiver
563         // before the new behavior was created (1 in this case)
564         // This will not work for a Candidate because as soon as a Candidate
565         // is created it increments the term
566         short leaderPayloadVersion = 10;
567         String leaderId = "leader-1";
568         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
569
570         follower = createBehavior(context);
571
572         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
573
574         Assert.assertSame(follower, newBehavior);
575
576         assertEquals("Next index", 5, log.last().getIndex() + 1);
577         assertEquals("Entry 3", entries.get(0), log.get(3));
578         assertEquals("Entry 4", entries.get(1), log.get(4));
579
580         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
581         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
582
583         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
584     }
585
586     /**
587      * This test verifies that when a new AppendEntries message is received with
588      * new entries and the logs of the sender and receiver are out-of-sync that
589      * the log is first corrected by removing the out of sync entries from the
590      * log and then adding in the new entries sent with the AppendEntries message.
591      */
592     @Test
593     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
594         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
595
596         MockRaftActorContext context = createActorContext();
597
598         // First set the receivers term to lower number
599         context.getTermInformation().update(1, "test");
600
601         // Prepare the receivers log
602         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
603         log.append(newReplicatedLogEntry(1, 0, "zero"));
604         log.append(newReplicatedLogEntry(1, 1, "one"));
605         log.append(newReplicatedLogEntry(1, 2, "two"));
606
607         context.setReplicatedLog(log);
608
609         // Prepare the entries to be sent with AppendEntries
610         List<ReplicatedLogEntry> entries = new ArrayList<>();
611         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
612         entries.add(newReplicatedLogEntry(2, 3, "three"));
613
614         // Send appendEntries with the same term as was set on the receiver
615         // before the new behavior was created (1 in this case)
616         // This will not work for a Candidate because as soon as a Candidate
617         // is created it increments the term
618         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
619
620         follower = createBehavior(context);
621
622         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
623
624         Assert.assertSame(follower, newBehavior);
625
626         // The entry at index 2 will be found out-of-sync with the leader
627         // and will be removed
628         // Then the two new entries will be added to the log
629         // Thus making the log to have 4 entries
630         assertEquals("Next index", 4, log.last().getIndex() + 1);
631         //assertEquals("Entry 2", entries.get(0), log.get(2));
632
633         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
634
635         // Check that the entry at index 2 has the new data
636         assertEquals("Entry 2", entries.get(0), log.get(2));
637
638         assertEquals("Entry 3", entries.get(1), log.get(3));
639
640         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
641     }
642
643     @Test
644     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
645         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
646
647         MockRaftActorContext context = createActorContext();
648
649         // First set the receivers term to lower number
650         context.getTermInformation().update(1, "test");
651
652         // Prepare the receivers log
653         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
654         log.append(newReplicatedLogEntry(1, 0, "zero"));
655         log.append(newReplicatedLogEntry(1, 1, "one"));
656         log.append(newReplicatedLogEntry(1, 2, "two"));
657
658         context.setReplicatedLog(log);
659
660         // Prepare the entries to be sent with AppendEntries
661         List<ReplicatedLogEntry> entries = new ArrayList<>();
662         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
663         entries.add(newReplicatedLogEntry(2, 3, "three"));
664
665         // Send appendEntries with the same term as was set on the receiver
666         // before the new behavior was created (1 in this case)
667         // This will not work for a Candidate because as soon as a Candidate
668         // is created it increments the term
669         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
670
671         context.setRaftPolicy(createRaftPolicy(false, true));
672         follower = createBehavior(context);
673
674         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
675
676         Assert.assertSame(follower, newBehavior);
677
678         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
679     }
680
681     @Test
682     public void testHandleAppendEntriesPreviousLogEntryMissing() {
683         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
684
685         final MockRaftActorContext context = createActorContext();
686
687         // Prepare the receivers log
688         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
689         log.append(newReplicatedLogEntry(1, 0, "zero"));
690         log.append(newReplicatedLogEntry(1, 1, "one"));
691         log.append(newReplicatedLogEntry(1, 2, "two"));
692
693         context.setReplicatedLog(log);
694
695         // Prepare the entries to be sent with AppendEntries
696         List<ReplicatedLogEntry> entries = new ArrayList<>();
697         entries.add(newReplicatedLogEntry(1, 4, "four"));
698
699         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
700
701         follower = createBehavior(context);
702
703         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
704
705         Assert.assertSame(follower, newBehavior);
706
707         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
708     }
709
710     @Test
711     public void testHandleAppendEntriesWithExistingLogEntry() {
712         logStart("testHandleAppendEntriesWithExistingLogEntry");
713
714         MockRaftActorContext context = createActorContext();
715
716         context.getTermInformation().update(1, "test");
717
718         // Prepare the receivers log
719         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
720         log.append(newReplicatedLogEntry(1, 0, "zero"));
721         log.append(newReplicatedLogEntry(1, 1, "one"));
722
723         context.setReplicatedLog(log);
724
725         // Send the last entry again.
726         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
727
728         follower = createBehavior(context);
729
730         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
731
732         assertEquals("Next index", 2, log.last().getIndex() + 1);
733         assertEquals("Entry 1", entries.get(0), log.get(1));
734
735         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
736
737         // Send the last entry again and also a new one.
738
739         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
740
741         leaderActor.underlyingActor().clear();
742         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
743
744         assertEquals("Next index", 3, log.last().getIndex() + 1);
745         assertEquals("Entry 1", entries.get(0), log.get(1));
746         assertEquals("Entry 2", entries.get(1), log.get(2));
747
748         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
749     }
750
751     @Test
752     public void testHandleAppendEntriesAfterInstallingSnapshot() {
753         logStart("testHandleAppendAfterInstallingSnapshot");
754
755         MockRaftActorContext context = createActorContext();
756
757         // Prepare the receivers log
758         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
759
760         // Set up a log as if it has been snapshotted
761         log.setSnapshotIndex(3);
762         log.setSnapshotTerm(1);
763
764         context.setReplicatedLog(log);
765
766         // Prepare the entries to be sent with AppendEntries
767         List<ReplicatedLogEntry> entries = new ArrayList<>();
768         entries.add(newReplicatedLogEntry(1, 4, "four"));
769
770         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
771
772         follower = createBehavior(context);
773
774         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
775
776         Assert.assertSame(follower, newBehavior);
777
778         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
779     }
780
781
782     /**
783      * This test verifies that when InstallSnapshot is received by
784      * the follower its applied correctly.
785      */
786     @Test
787     public void testHandleInstallSnapshot() throws Exception {
788         logStart("testHandleInstallSnapshot");
789
790         MockRaftActorContext context = createActorContext();
791         context.getTermInformation().update(1, "leader");
792
793         follower = createBehavior(context);
794
795         ByteString bsSnapshot  = createSnapshot();
796         int offset = 0;
797         int snapshotLength = bsSnapshot.size();
798         int chunkSize = 50;
799         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
800         int lastIncludedIndex = 1;
801         int chunkIndex = 1;
802         InstallSnapshot lastInstallSnapshot = null;
803
804         for (int i = 0; i < totalChunks; i++) {
805             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
806             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
807                     chunkData, chunkIndex, totalChunks);
808             follower.handleMessage(leaderActor, lastInstallSnapshot);
809             offset = offset + 50;
810             lastIncludedIndex++;
811             chunkIndex++;
812         }
813
814         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
815                 ApplySnapshot.class);
816         Snapshot snapshot = applySnapshot.getSnapshot();
817         assertNotNull(lastInstallSnapshot);
818         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
819         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
820                 snapshot.getLastAppliedTerm());
821         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
822                 snapshot.getLastAppliedIndex());
823         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
824         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
825         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
826         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
827         applySnapshot.getCallback().onSuccess();
828
829         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
830                 leaderActor, InstallSnapshotReply.class);
831         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
832
833         chunkIndex = 1;
834         for (InstallSnapshotReply reply: replies) {
835             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
836             assertEquals("getTerm", 1, reply.getTerm());
837             assertEquals("isSuccess", true, reply.isSuccess());
838             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
839         }
840
841         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
842     }
843
844
845     /**
846      * Verify that when an AppendEntries is sent to a follower during a snapshot install
847      * the Follower short-circuits the processing of the AppendEntries message.
848      */
849     @Test
850     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
851         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
852
853         MockRaftActorContext context = createActorContext();
854
855         follower = createBehavior(context);
856
857         ByteString bsSnapshot  = createSnapshot();
858         int snapshotLength = bsSnapshot.size();
859         int chunkSize = 50;
860         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
861         int lastIncludedIndex = 1;
862
863         // Check that snapshot installation is not in progress
864         assertNull(follower.getSnapshotTracker());
865
866         // Make sure that we have more than 1 chunk to send
867         assertTrue(totalChunks > 1);
868
869         // Send an install snapshot with the first chunk to start the process of installing a snapshot
870         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
871         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
872                 chunkData, 1, totalChunks));
873
874         // Check if snapshot installation is in progress now
875         assertNotNull(follower.getSnapshotTracker());
876
877         // Send an append entry
878         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
879                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
880
881         follower.handleMessage(leaderActor, appendEntries);
882
883         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
884         assertEquals("isSuccess", true, reply.isSuccess());
885         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
886         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
887         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
888
889         assertNotNull(follower.getSnapshotTracker());
890     }
891
892     @Test
893     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
894         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
895
896         MockRaftActorContext context = createActorContext();
897
898         follower = createBehavior(context);
899
900         ByteString bsSnapshot  = createSnapshot();
901         int snapshotLength = bsSnapshot.size();
902         int chunkSize = 50;
903         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
904         int lastIncludedIndex = 1;
905
906         // Check that snapshot installation is not in progress
907         assertNull(follower.getSnapshotTracker());
908
909         // Make sure that we have more than 1 chunk to send
910         assertTrue(totalChunks > 1);
911
912         // Send an install snapshot with the first chunk to start the process of installing a snapshot
913         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
914         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
915                 chunkData, 1, totalChunks));
916
917         // Check if snapshot installation is in progress now
918         assertNotNull(follower.getSnapshotTracker());
919
920         // Send appendEntries with a new term and leader.
921         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
922                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
923
924         follower.handleMessage(leaderActor, appendEntries);
925
926         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
927         assertEquals("isSuccess", true, reply.isSuccess());
928         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
929         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
930         assertEquals("getTerm", 2, reply.getTerm());
931
932         assertNull(follower.getSnapshotTracker());
933     }
934
935     @Test
936     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
937         logStart("testInitialSyncUpWithHandleInstallSnapshot");
938
939         MockRaftActorContext context = createActorContext();
940         context.setCommitIndex(-1);
941
942         follower = createBehavior(context);
943
944         ByteString bsSnapshot  = createSnapshot();
945         int offset = 0;
946         int snapshotLength = bsSnapshot.size();
947         int chunkSize = 50;
948         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
949         int lastIncludedIndex = 1;
950         int chunkIndex = 1;
951         InstallSnapshot lastInstallSnapshot = null;
952
953         for (int i = 0; i < totalChunks; i++) {
954             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
955             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
956                     chunkData, chunkIndex, totalChunks);
957             follower.handleMessage(leaderActor, lastInstallSnapshot);
958             offset = offset + 50;
959             lastIncludedIndex++;
960             chunkIndex++;
961         }
962
963         FollowerInitialSyncUpStatus syncStatus =
964                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
965
966         assertFalse(syncStatus.isInitialSyncDone());
967
968         // Clear all the messages
969         followerActor.underlyingActor().clear();
970
971         context.setLastApplied(101);
972         context.setCommitIndex(101);
973         setLastLogEntry(context, 1, 101,
974                 new MockRaftActorContext.MockPayload(""));
975
976         List<ReplicatedLogEntry> entries = Arrays.asList(
977                 newReplicatedLogEntry(2, 101, "foo"));
978
979         // The new commitIndex is 101
980         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
981         follower.handleMessage(leaderActor, appendEntries);
982
983         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
984
985         assertTrue(syncStatus.isInitialSyncDone());
986     }
987
988     @Test
989     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
990         logStart("testHandleOutOfSequenceInstallSnapshot");
991
992         MockRaftActorContext context = createActorContext();
993
994         follower = createBehavior(context);
995
996         ByteString bsSnapshot = createSnapshot();
997
998         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
999                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1000         follower.handleMessage(leaderActor, installSnapshot);
1001
1002         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1003                 InstallSnapshotReply.class);
1004
1005         assertEquals("isSuccess", false, reply.isSuccess());
1006         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1007         assertEquals("getTerm", 1, reply.getTerm());
1008         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1009
1010         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1011     }
1012
1013     @Test
1014     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1015         MockRaftActorContext context = createActorContext();
1016
1017         Stopwatch stopwatch = Stopwatch.createStarted();
1018
1019         follower = createBehavior(context);
1020
1021         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1022
1023         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1024
1025         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1026
1027         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1028         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1029     }
1030
1031     @Test
1032     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1033         MockRaftActorContext context = createActorContext();
1034         context.setConfigParams(new DefaultConfigParamsImpl() {
1035             @Override
1036             public FiniteDuration getElectionTimeOutInterval() {
1037                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1038             }
1039         });
1040
1041         context.setRaftPolicy(createRaftPolicy(false, false));
1042
1043         follower = createBehavior(context);
1044
1045         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1046         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1047         assertSame("handleMessage result", follower, newBehavior);
1048     }
1049
1050     @Test
1051     public void testFollowerSchedulesElectionIfNonVoting() {
1052         MockRaftActorContext context = createActorContext();
1053         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1054         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1055                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1056         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1057
1058         follower = new Follower(context, "leader", (short)1);
1059
1060         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1061                 ElectionTimeout.class);
1062         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1063         assertSame("handleMessage result", follower, newBehavior);
1064         assertNull("Expected null leaderId", follower.getLeaderId());
1065     }
1066
1067     @Test
1068     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1069         MockRaftActorContext context = createActorContext();
1070         follower = createBehavior(context);
1071         follower.handleMessage(leaderActor, new RaftRPC() {
1072             private static final long serialVersionUID = 1L;
1073
1074             @Override
1075             public long getTerm() {
1076                 return 100;
1077             }
1078         });
1079         verify(follower).scheduleElection(any(FiniteDuration.class));
1080     }
1081
1082     @Test
1083     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1084         MockRaftActorContext context = createActorContext();
1085         follower = createBehavior(context);
1086         follower.handleMessage(leaderActor, "non-raft-rpc");
1087         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1088     }
1089
1090     public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
1091         int snapshotLength = bs.size();
1092         int start = offset;
1093         int size = chunkSize;
1094         if (chunkSize > snapshotLength) {
1095             size = snapshotLength;
1096         } else {
1097             if (start + chunkSize > snapshotLength) {
1098                 size = snapshotLength - start;
1099             }
1100         }
1101
1102         byte[] nextChunk = new byte[size];
1103         bs.copyTo(nextChunk, start, 0, size);
1104         return nextChunk;
1105     }
1106
1107     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1108             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1109         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1110     }
1111
1112     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1113                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1114                                                    boolean expForceInstallSnapshot) {
1115
1116         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1117                 AppendEntriesReply.class);
1118
1119         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1120         assertEquals("getTerm", expTerm, reply.getTerm());
1121         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1122         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1123         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1124         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1125         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1126     }
1127
1128
1129     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1130         return new SimpleReplicatedLogEntry(index, term,
1131                 new MockRaftActorContext.MockPayload(data));
1132     }
1133
1134     private ByteString createSnapshot() {
1135         HashMap<String, String> followerSnapshot = new HashMap<>();
1136         followerSnapshot.put("1", "A");
1137         followerSnapshot.put("2", "B");
1138         followerSnapshot.put("3", "C");
1139
1140         return toByteString(followerSnapshot);
1141     }
1142
1143     @Override
1144     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1145             ActorRef actorRef, RaftRPC rpc) throws Exception {
1146         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1147
1148         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1149         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1150     }
1151
1152     @Override
1153     protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1154             throws Exception {
1155         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1156         assertEquals("isSuccess", true, reply.isSuccess());
1157     }
1158 }