Avoid unnecessary unsuccessful AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.dispatch.Dispatchers;
24 import akka.protobuf.ByteString;
25 import akka.testkit.TestActorRef;
26 import akka.testkit.javadsl.TestKit;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.io.ByteSource;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.io.OutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
48 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
56 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
64 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
65 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
71 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
72 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.duration.FiniteDuration;
77
78 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
79
80     private final ActorRef followerActor = actorFactory.createActor(
81             MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
82
83     private final ActorRef leaderActor = actorFactory.createActor(
84             MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
85
86     private Follower follower;
87
88     private final short payloadVersion = 5;
89
90     @Override
91     @After
92     public void tearDown() {
93         if (follower != null) {
94             follower.close();
95         }
96
97         super.tearDown();
98     }
99
100     @Override
101     protected Follower createBehavior(final RaftActorContext actorContext) {
102         return spy(new Follower(actorContext));
103     }
104
105     @Override
106     protected  MockRaftActorContext createActorContext() {
107         return createActorContext(followerActor);
108     }
109
110     @Override
111     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
112         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
113         context.setPayloadVersion(payloadVersion);
114         return context;
115     }
116
117     @Test
118     public void testThatAnElectionTimeoutIsTriggered() {
119         MockRaftActorContext actorContext = createActorContext();
120         follower = new Follower(actorContext);
121
122         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
123                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
124     }
125
126     @Test
127     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
128         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
129
130         MockRaftActorContext context = createActorContext();
131         follower = new Follower(context);
132
133         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
134                 TimeUnit.MILLISECONDS);
135         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
136
137         assertTrue(raftBehavior instanceof Candidate);
138     }
139
140     @Test
141     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
142         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
143
144         MockRaftActorContext context = createActorContext();
145         ((DefaultConfigParamsImpl) context.getConfigParams())
146                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
147         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
148
149         follower = new Follower(context);
150         context.setCurrentBehavior(follower);
151
152         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
153                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
154         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
155                 -1, -1, (short) 1));
156
157         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
158         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
159         assertTrue(raftBehavior instanceof Follower);
160
161         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
162                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
163         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
164                 -1, -1, (short) 1));
165
166         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
167         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
168         assertTrue(raftBehavior instanceof Follower);
169     }
170
171     @Test
172     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
173         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
174
175         MockRaftActorContext context = createActorContext();
176         long term = 1000;
177         context.getTermInformation().update(term, null);
178
179         follower = createBehavior(context);
180
181         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
182
183         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
184
185         assertEquals("isVoteGranted", true, reply.isVoteGranted());
186         assertEquals("getTerm", term, reply.getTerm());
187         verify(follower).scheduleElection(any(FiniteDuration.class));
188     }
189
190     @Test
191     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
192         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
193
194         MockRaftActorContext context = createActorContext();
195         long term = 1000;
196         context.getTermInformation().update(term, "test");
197
198         follower = createBehavior(context);
199
200         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
201
202         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
203
204         assertEquals("isVoteGranted", false, reply.isVoteGranted());
205         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
206     }
207
208
209     @Test
210     public void testHandleFirstAppendEntries() {
211         logStart("testHandleFirstAppendEntries");
212
213         MockRaftActorContext context = createActorContext();
214         context.getReplicatedLog().clear(0,2);
215         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
216         context.getReplicatedLog().setSnapshotIndex(99);
217
218         List<ReplicatedLogEntry> entries = Arrays.asList(
219                 newReplicatedLogEntry(2, 101, "foo"));
220
221         Assert.assertEquals(1, context.getReplicatedLog().size());
222
223         // The new commitIndex is 101
224         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
225
226         follower = createBehavior(context);
227         follower.handleMessage(leaderActor, appendEntries);
228
229         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
230                 FollowerInitialSyncUpStatus.class);
231         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
232
233         assertFalse(syncStatus.isInitialSyncDone());
234         assertTrue("append entries reply should be true", reply.isSuccess());
235     }
236
237     @Test
238     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
239         logStart("testHandleFirstAppendEntries");
240
241         MockRaftActorContext context = createActorContext();
242
243         List<ReplicatedLogEntry> entries = Arrays.asList(
244                 newReplicatedLogEntry(2, 101, "foo"));
245
246         // The new commitIndex is 101
247         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
248
249         follower = createBehavior(context);
250         follower.handleMessage(leaderActor, appendEntries);
251
252         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
253                 FollowerInitialSyncUpStatus.class);
254         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
255
256         assertFalse(syncStatus.isInitialSyncDone());
257         assertFalse("append entries reply should be false", reply.isSuccess());
258     }
259
260     @Test
261     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
262         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
263
264         MockRaftActorContext context = createActorContext();
265         context.getReplicatedLog().clear(0,2);
266         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
267         context.getReplicatedLog().setSnapshotIndex(99);
268
269         List<ReplicatedLogEntry> entries = Arrays.asList(
270                 newReplicatedLogEntry(2, 101, "foo"));
271
272         // The new commitIndex is 101
273         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
274
275         follower = createBehavior(context);
276         follower.handleMessage(leaderActor, appendEntries);
277
278         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
279                 FollowerInitialSyncUpStatus.class);
280         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
281
282         assertFalse(syncStatus.isInitialSyncDone());
283         assertTrue("append entries reply should be true", reply.isSuccess());
284     }
285
286     @Test
287     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
288         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
289
290         MockRaftActorContext context = createActorContext();
291         context.getReplicatedLog().clear(0,2);
292         context.getReplicatedLog().setSnapshotIndex(100);
293
294         List<ReplicatedLogEntry> entries = Arrays.asList(
295                 newReplicatedLogEntry(2, 101, "foo"));
296
297         // The new commitIndex is 101
298         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
299
300         follower = createBehavior(context);
301         follower.handleMessage(leaderActor, appendEntries);
302
303         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
304                 FollowerInitialSyncUpStatus.class);
305         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
306
307         assertFalse(syncStatus.isInitialSyncDone());
308         assertTrue("append entries reply should be true", reply.isSuccess());
309     }
310
311     @Test
312     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
313         logStart(
314                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
315
316         MockRaftActorContext context = createActorContext();
317         context.getReplicatedLog().clear(0,2);
318         context.getReplicatedLog().setSnapshotIndex(100);
319
320         List<ReplicatedLogEntry> entries = Arrays.asList(
321                 newReplicatedLogEntry(2, 105, "foo"));
322
323         // The new commitIndex is 101
324         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
325
326         follower = createBehavior(context);
327         follower.handleMessage(leaderActor, appendEntries);
328
329         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
330                 FollowerInitialSyncUpStatus.class);
331         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
332
333         assertFalse(syncStatus.isInitialSyncDone());
334         assertFalse("append entries reply should be false", reply.isSuccess());
335     }
336
337     @Test
338     public void testHandleSyncUpAppendEntries() {
339         logStart("testHandleSyncUpAppendEntries");
340
341         MockRaftActorContext context = createActorContext();
342
343         List<ReplicatedLogEntry> entries = Arrays.asList(
344                 newReplicatedLogEntry(2, 101, "foo"));
345
346         // The new commitIndex is 101
347         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
348
349         follower = createBehavior(context);
350         follower.handleMessage(leaderActor, appendEntries);
351
352         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
353                 FollowerInitialSyncUpStatus.class);
354
355         assertFalse(syncStatus.isInitialSyncDone());
356
357         // Clear all the messages
358         MessageCollectorActor.clearMessages(followerActor);
359
360         context.setLastApplied(101);
361         context.setCommitIndex(101);
362         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
363
364         entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
365
366         // The new commitIndex is 101
367         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
368         follower.handleMessage(leaderActor, appendEntries);
369
370         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
371
372         assertTrue(syncStatus.isInitialSyncDone());
373
374         MessageCollectorActor.clearMessages(followerActor);
375
376         // Sending the same message again should not generate another message
377
378         follower.handleMessage(leaderActor, appendEntries);
379
380         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
381
382         assertNull(syncStatus);
383     }
384
385     @Test
386     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
387         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
388
389         MockRaftActorContext context = createActorContext();
390
391         List<ReplicatedLogEntry> entries = Arrays.asList(
392                 newReplicatedLogEntry(2, 101, "foo"));
393
394         // The new commitIndex is 101
395         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
396
397         follower = createBehavior(context);
398         follower.handleMessage(leaderActor, appendEntries);
399
400         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
401                 FollowerInitialSyncUpStatus.class);
402
403         assertFalse(syncStatus.isInitialSyncDone());
404
405         // Clear all the messages
406         MessageCollectorActor.clearMessages(followerActor);
407
408         context.setLastApplied(100);
409         setLastLogEntry(context, 1, 100,
410                 new MockRaftActorContext.MockPayload(""));
411
412         entries = Arrays.asList(
413                 newReplicatedLogEntry(2, 101, "foo"));
414
415         // leader-2 is becoming the leader now and it says the commitIndex is 45
416         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
417         follower.handleMessage(leaderActor, appendEntries);
418
419         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
420
421         // We get a new message saying initial status is not done
422         assertFalse(syncStatus.isInitialSyncDone());
423     }
424
425     @Test
426     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
427         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
428
429         MockRaftActorContext context = createActorContext();
430
431         List<ReplicatedLogEntry> entries = Arrays.asList(
432                 newReplicatedLogEntry(2, 101, "foo"));
433
434         // The new commitIndex is 101
435         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
436
437         follower = createBehavior(context);
438         follower.handleMessage(leaderActor, appendEntries);
439
440         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
441                 FollowerInitialSyncUpStatus.class);
442
443         assertFalse(syncStatus.isInitialSyncDone());
444
445         // Clear all the messages
446         MessageCollectorActor.clearMessages(followerActor);
447
448         context.setLastApplied(101);
449         context.setCommitIndex(101);
450         setLastLogEntry(context, 1, 101,
451                 new MockRaftActorContext.MockPayload(""));
452
453         entries = Arrays.asList(
454                 newReplicatedLogEntry(2, 101, "foo"));
455
456         // The new commitIndex is 101
457         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
458         follower.handleMessage(leaderActor, appendEntries);
459
460         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
461
462         assertTrue(syncStatus.isInitialSyncDone());
463
464         // Clear all the messages
465         MessageCollectorActor.clearMessages(followerActor);
466
467         context.setLastApplied(100);
468         setLastLogEntry(context, 1, 100,
469                 new MockRaftActorContext.MockPayload(""));
470
471         entries = Arrays.asList(
472                 newReplicatedLogEntry(2, 101, "foo"));
473
474         // leader-2 is becoming the leader now and it says the commitIndex is 45
475         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
476         follower.handleMessage(leaderActor, appendEntries);
477
478         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
479
480         // We get a new message saying initial status is not done
481         assertFalse(syncStatus.isInitialSyncDone());
482     }
483
484     /**
485      * This test verifies that when an AppendEntries RPC is received by a RaftActor
486      * with a commitIndex that is greater than what has been applied to the
487      * state machine of the RaftActor, the RaftActor applies the state and
488      * sets it current applied state to the commitIndex of the sender.
489      */
490     @Test
491     public void testHandleAppendEntriesWithNewerCommitIndex() {
492         logStart("testHandleAppendEntriesWithNewerCommitIndex");
493
494         MockRaftActorContext context = createActorContext();
495
496         context.setLastApplied(100);
497         setLastLogEntry(context, 1, 100,
498                 new MockRaftActorContext.MockPayload(""));
499         context.getReplicatedLog().setSnapshotIndex(99);
500
501         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
502                 newReplicatedLogEntry(2, 101, "foo"));
503
504         // The new commitIndex is 101
505         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
506
507         follower = createBehavior(context);
508         follower.handleMessage(leaderActor, appendEntries);
509
510         assertEquals("getLastApplied", 101L, context.getLastApplied());
511     }
512
513     /**
514      * This test verifies that when an AppendEntries is received with a prevLogTerm
515      * which does not match the term that is in RaftActors log entry at prevLogIndex
516      * then the RaftActor does not change it's state and it returns a failure.
517      */
518     @Test
519     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
520         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
521
522         MockRaftActorContext context = createActorContext();
523
524         AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
525
526         follower = createBehavior(context);
527
528         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
529
530         Assert.assertSame(follower, newBehavior);
531
532         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
533                 AppendEntriesReply.class);
534
535         assertEquals("isSuccess", false, reply.isSuccess());
536     }
537
538     @Test
539     public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
540         logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
541
542         MockRaftActorContext context = createActorContext();
543         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
544         context.getReplicatedLog().setSnapshotIndex(4);
545         context.getReplicatedLog().setSnapshotTerm(3);
546
547         AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
548
549         follower = createBehavior(context);
550
551         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
552
553         Assert.assertSame(follower, newBehavior);
554
555         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
556
557         assertEquals("isSuccess", true, reply.isSuccess());
558     }
559
560     /**
561      * This test verifies that when a new AppendEntries message is received with
562      * new entries and the logs of the sender and receiver match that the new
563      * entries get added to the log and the log is incremented by the number of
564      * entries received in appendEntries.
565      */
566     @Test
567     public void testHandleAppendEntriesAddNewEntries() {
568         logStart("testHandleAppendEntriesAddNewEntries");
569
570         MockRaftActorContext context = createActorContext();
571
572         // First set the receivers term to lower number
573         context.getTermInformation().update(1, "test");
574
575         // Prepare the receivers log
576         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
577         log.append(newReplicatedLogEntry(1, 0, "zero"));
578         log.append(newReplicatedLogEntry(1, 1, "one"));
579         log.append(newReplicatedLogEntry(1, 2, "two"));
580
581         context.setReplicatedLog(log);
582
583         // Prepare the entries to be sent with AppendEntries
584         List<ReplicatedLogEntry> entries = new ArrayList<>();
585         entries.add(newReplicatedLogEntry(1, 3, "three"));
586         entries.add(newReplicatedLogEntry(1, 4, "four"));
587
588         // Send appendEntries with the same term as was set on the receiver
589         // before the new behavior was created (1 in this case)
590         // This will not work for a Candidate because as soon as a Candidate
591         // is created it increments the term
592         short leaderPayloadVersion = 10;
593         String leaderId = "leader-1";
594         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
595
596         follower = createBehavior(context);
597
598         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
599
600         Assert.assertSame(follower, newBehavior);
601
602         assertEquals("Next index", 5, log.last().getIndex() + 1);
603         assertEquals("Entry 3", entries.get(0), log.get(3));
604         assertEquals("Entry 4", entries.get(1), log.get(4));
605
606         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
607         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
608
609         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
610     }
611
612     /**
613      * This test verifies that when a new AppendEntries message is received with
614      * new entries and the logs of the sender and receiver are out-of-sync that
615      * the log is first corrected by removing the out of sync entries from the
616      * log and then adding in the new entries sent with the AppendEntries message.
617      */
618     @Test
619     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
620         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
621
622         MockRaftActorContext context = createActorContext();
623
624         // First set the receivers term to lower number
625         context.getTermInformation().update(1, "test");
626
627         // Prepare the receivers log
628         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
629         log.append(newReplicatedLogEntry(1, 0, "zero"));
630         log.append(newReplicatedLogEntry(1, 1, "one"));
631         log.append(newReplicatedLogEntry(1, 2, "two"));
632
633         context.setReplicatedLog(log);
634
635         // Prepare the entries to be sent with AppendEntries
636         List<ReplicatedLogEntry> entries = new ArrayList<>();
637         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
638         entries.add(newReplicatedLogEntry(2, 3, "three"));
639
640         // Send appendEntries with the same term as was set on the receiver
641         // before the new behavior was created (1 in this case)
642         // This will not work for a Candidate because as soon as a Candidate
643         // is created it increments the term
644         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
645
646         follower = createBehavior(context);
647
648         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
649
650         Assert.assertSame(follower, newBehavior);
651
652         // The entry at index 2 will be found out-of-sync with the leader
653         // and will be removed
654         // Then the two new entries will be added to the log
655         // Thus making the log to have 4 entries
656         assertEquals("Next index", 4, log.last().getIndex() + 1);
657         //assertEquals("Entry 2", entries.get(0), log.get(2));
658
659         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
660
661         // Check that the entry at index 2 has the new data
662         assertEquals("Entry 2", entries.get(0), log.get(2));
663
664         assertEquals("Entry 3", entries.get(1), log.get(3));
665
666         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
667     }
668
669     @Test
670     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
671         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
672
673         MockRaftActorContext context = createActorContext();
674
675         // First set the receivers term to lower number
676         context.getTermInformation().update(1, "test");
677
678         // Prepare the receivers log
679         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
680         log.append(newReplicatedLogEntry(1, 0, "zero"));
681         log.append(newReplicatedLogEntry(1, 1, "one"));
682         log.append(newReplicatedLogEntry(1, 2, "two"));
683
684         context.setReplicatedLog(log);
685
686         // Prepare the entries to be sent with AppendEntries
687         List<ReplicatedLogEntry> entries = new ArrayList<>();
688         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
689         entries.add(newReplicatedLogEntry(2, 3, "three"));
690
691         // Send appendEntries with the same term as was set on the receiver
692         // before the new behavior was created (1 in this case)
693         // This will not work for a Candidate because as soon as a Candidate
694         // is created it increments the term
695         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
696
697         context.setRaftPolicy(createRaftPolicy(false, true));
698         follower = createBehavior(context);
699
700         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
701
702         Assert.assertSame(follower, newBehavior);
703
704         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
705     }
706
707     @Test
708     public void testHandleAppendEntriesPreviousLogEntryMissing() {
709         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
710
711         final MockRaftActorContext context = createActorContext();
712
713         // Prepare the receivers log
714         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
715         log.append(newReplicatedLogEntry(1, 0, "zero"));
716         log.append(newReplicatedLogEntry(1, 1, "one"));
717         log.append(newReplicatedLogEntry(1, 2, "two"));
718
719         context.setReplicatedLog(log);
720
721         // Prepare the entries to be sent with AppendEntries
722         List<ReplicatedLogEntry> entries = new ArrayList<>();
723         entries.add(newReplicatedLogEntry(1, 4, "four"));
724
725         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
726
727         follower = createBehavior(context);
728
729         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
730
731         Assert.assertSame(follower, newBehavior);
732
733         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
734     }
735
736     @Test
737     public void testHandleAppendEntriesWithExistingLogEntry() {
738         logStart("testHandleAppendEntriesWithExistingLogEntry");
739
740         MockRaftActorContext context = createActorContext();
741
742         context.getTermInformation().update(1, "test");
743
744         // Prepare the receivers log
745         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
746         log.append(newReplicatedLogEntry(1, 0, "zero"));
747         log.append(newReplicatedLogEntry(1, 1, "one"));
748
749         context.setReplicatedLog(log);
750
751         // Send the last entry again.
752         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
753
754         follower = createBehavior(context);
755
756         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
757
758         assertEquals("Next index", 2, log.last().getIndex() + 1);
759         assertEquals("Entry 1", entries.get(0), log.get(1));
760
761         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
762
763         // Send the last entry again and also a new one.
764
765         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
766
767         MessageCollectorActor.clearMessages(leaderActor);
768         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
769
770         assertEquals("Next index", 3, log.last().getIndex() + 1);
771         assertEquals("Entry 1", entries.get(0), log.get(1));
772         assertEquals("Entry 2", entries.get(1), log.get(2));
773
774         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
775     }
776
777     @Test
778     public void testHandleAppendEntriesAfterInstallingSnapshot() {
779         logStart("testHandleAppendAfterInstallingSnapshot");
780
781         MockRaftActorContext context = createActorContext();
782
783         // Prepare the receivers log
784         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
785
786         // Set up a log as if it has been snapshotted
787         log.setSnapshotIndex(3);
788         log.setSnapshotTerm(1);
789
790         context.setReplicatedLog(log);
791
792         // Prepare the entries to be sent with AppendEntries
793         List<ReplicatedLogEntry> entries = new ArrayList<>();
794         entries.add(newReplicatedLogEntry(1, 4, "four"));
795
796         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
797
798         follower = createBehavior(context);
799
800         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
801
802         Assert.assertSame(follower, newBehavior);
803
804         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
805     }
806
807
808     /**
809      * This test verifies that when InstallSnapshot is received by
810      * the follower its applied correctly.
811      */
812     @Test
813     public void testHandleInstallSnapshot() {
814         logStart("testHandleInstallSnapshot");
815
816         MockRaftActorContext context = createActorContext();
817         context.getTermInformation().update(1, "leader");
818
819         follower = createBehavior(context);
820
821         ByteString bsSnapshot = createSnapshot();
822         int offset = 0;
823         int snapshotLength = bsSnapshot.size();
824         int chunkSize = 50;
825         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
826         int lastIncludedIndex = 1;
827         int chunkIndex = 1;
828         InstallSnapshot lastInstallSnapshot = null;
829
830         for (int i = 0; i < totalChunks; i++) {
831             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
832             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
833                     chunkData, chunkIndex, totalChunks);
834             follower.handleMessage(leaderActor, lastInstallSnapshot);
835             offset = offset + 50;
836             lastIncludedIndex++;
837             chunkIndex++;
838         }
839
840         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
841                 ApplySnapshot.class);
842         Snapshot snapshot = applySnapshot.getSnapshot();
843         assertNotNull(lastInstallSnapshot);
844         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
845         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
846                 snapshot.getLastAppliedTerm());
847         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
848                 snapshot.getLastAppliedIndex());
849         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
850         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
851         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
852         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
853         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
854         applySnapshot.getCallback().onSuccess();
855
856         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
857                 leaderActor, InstallSnapshotReply.class);
858         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
859
860         chunkIndex = 1;
861         for (InstallSnapshotReply reply: replies) {
862             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
863             assertEquals("getTerm", 1, reply.getTerm());
864             assertEquals("isSuccess", true, reply.isSuccess());
865             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
866         }
867
868         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
869     }
870
871     /**
872      * Verify that when an AppendEntries is sent to a follower during a snapshot install
873      * the Follower short-circuits the processing of the AppendEntries message.
874      */
875     @Test
876     public void testReceivingAppendEntriesDuringInstallSnapshot() {
877         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
878
879         MockRaftActorContext context = createActorContext();
880
881         follower = createBehavior(context);
882
883         ByteString bsSnapshot  = createSnapshot();
884         int snapshotLength = bsSnapshot.size();
885         int chunkSize = 50;
886         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
887         int lastIncludedIndex = 1;
888
889         // Check that snapshot installation is not in progress
890         assertNull(follower.getSnapshotTracker());
891
892         // Make sure that we have more than 1 chunk to send
893         assertTrue(totalChunks > 1);
894
895         // Send an install snapshot with the first chunk to start the process of installing a snapshot
896         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
897         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
898                 chunkData, 1, totalChunks));
899
900         // Check if snapshot installation is in progress now
901         assertNotNull(follower.getSnapshotTracker());
902
903         // Send an append entry
904         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
905                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
906
907         follower.handleMessage(leaderActor, appendEntries);
908
909         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
910         assertEquals("isSuccess", true, reply.isSuccess());
911         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
912         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
913         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
914
915         assertNotNull(follower.getSnapshotTracker());
916     }
917
918     @Test
919     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
920         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
921
922         MockRaftActorContext context = createActorContext();
923
924         follower = createBehavior(context);
925
926         ByteString bsSnapshot  = createSnapshot();
927         int snapshotLength = bsSnapshot.size();
928         int chunkSize = 50;
929         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
930         int lastIncludedIndex = 1;
931
932         // Check that snapshot installation is not in progress
933         assertNull(follower.getSnapshotTracker());
934
935         // Make sure that we have more than 1 chunk to send
936         assertTrue(totalChunks > 1);
937
938         // Send an install snapshot with the first chunk to start the process of installing a snapshot
939         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
940         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
941                 chunkData, 1, totalChunks));
942
943         // Check if snapshot installation is in progress now
944         assertNotNull(follower.getSnapshotTracker());
945
946         // Send appendEntries with a new term and leader.
947         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
948                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
949
950         follower.handleMessage(leaderActor, appendEntries);
951
952         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
953         assertEquals("isSuccess", true, reply.isSuccess());
954         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
955         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
956         assertEquals("getTerm", 2, reply.getTerm());
957
958         assertNull(follower.getSnapshotTracker());
959     }
960
961     @Test
962     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
963         logStart("testInitialSyncUpWithHandleInstallSnapshot");
964
965         MockRaftActorContext context = createActorContext();
966         context.setCommitIndex(-1);
967
968         follower = createBehavior(context);
969
970         ByteString bsSnapshot  = createSnapshot();
971         int offset = 0;
972         int snapshotLength = bsSnapshot.size();
973         int chunkSize = 50;
974         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
975         int lastIncludedIndex = 1;
976         int chunkIndex = 1;
977         InstallSnapshot lastInstallSnapshot = null;
978
979         for (int i = 0; i < totalChunks; i++) {
980             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
981             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
982                     chunkData, chunkIndex, totalChunks);
983             follower.handleMessage(leaderActor, lastInstallSnapshot);
984             offset = offset + 50;
985             lastIncludedIndex++;
986             chunkIndex++;
987         }
988
989         FollowerInitialSyncUpStatus syncStatus =
990                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
991
992         assertFalse(syncStatus.isInitialSyncDone());
993
994         // Clear all the messages
995         MessageCollectorActor.clearMessages(followerActor);
996
997         context.setLastApplied(101);
998         context.setCommitIndex(101);
999         setLastLogEntry(context, 1, 101,
1000                 new MockRaftActorContext.MockPayload(""));
1001
1002         List<ReplicatedLogEntry> entries = Arrays.asList(
1003                 newReplicatedLogEntry(2, 101, "foo"));
1004
1005         // The new commitIndex is 101
1006         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
1007         follower.handleMessage(leaderActor, appendEntries);
1008
1009         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1010
1011         assertTrue(syncStatus.isInitialSyncDone());
1012     }
1013
1014     @Test
1015     public void testHandleOutOfSequenceInstallSnapshot() {
1016         logStart("testHandleOutOfSequenceInstallSnapshot");
1017
1018         MockRaftActorContext context = createActorContext();
1019
1020         follower = createBehavior(context);
1021
1022         ByteString bsSnapshot = createSnapshot();
1023
1024         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1025                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1026         follower.handleMessage(leaderActor, installSnapshot);
1027
1028         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1029                 InstallSnapshotReply.class);
1030
1031         assertEquals("isSuccess", false, reply.isSuccess());
1032         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1033         assertEquals("getTerm", 1, reply.getTerm());
1034         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1035
1036         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1037     }
1038
1039     @Test
1040     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1041         MockRaftActorContext context = createActorContext();
1042
1043         Stopwatch stopwatch = Stopwatch.createStarted();
1044
1045         follower = createBehavior(context);
1046
1047         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1048
1049         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1050
1051         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1052
1053         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1054         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1055     }
1056
1057     @Test
1058     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1059         MockRaftActorContext context = createActorContext();
1060         context.setConfigParams(new DefaultConfigParamsImpl() {
1061             @Override
1062             public FiniteDuration getElectionTimeOutInterval() {
1063                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1064             }
1065         });
1066
1067         context.setRaftPolicy(createRaftPolicy(false, false));
1068
1069         follower = createBehavior(context);
1070
1071         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1072         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1073         assertSame("handleMessage result", follower, newBehavior);
1074     }
1075
1076     @Test
1077     public void testFollowerSchedulesElectionIfNonVoting() {
1078         MockRaftActorContext context = createActorContext();
1079         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1080         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1081                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1082         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1083
1084         follower = new Follower(context, "leader", (short)1);
1085
1086         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1087                 ElectionTimeout.class);
1088         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1089         assertSame("handleMessage result", follower, newBehavior);
1090         assertNull("Expected null leaderId", follower.getLeaderId());
1091     }
1092
1093     @Test
1094     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1095         MockRaftActorContext context = createActorContext();
1096         follower = createBehavior(context);
1097         follower.handleMessage(leaderActor, new RaftRPC() {
1098             private static final long serialVersionUID = 1L;
1099
1100             @Override
1101             public long getTerm() {
1102                 return 100;
1103             }
1104         });
1105         verify(follower).scheduleElection(any(FiniteDuration.class));
1106     }
1107
1108     @Test
1109     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1110         MockRaftActorContext context = createActorContext();
1111         follower = createBehavior(context);
1112         follower.handleMessage(leaderActor, "non-raft-rpc");
1113         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1114     }
1115
1116     @Test
1117     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1118         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1119         logStart(id);
1120
1121         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1122
1123         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1124         config.setSnapshotBatchCount(2);
1125         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1126
1127         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1128         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1129         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1130                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1131         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1132                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1133         followerRaftActor.set(followerActorRef.underlyingActor());
1134         followerRaftActor.get().waitForInitializeBehaviorComplete();
1135
1136         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1137         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1138         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1139
1140         List<ReplicatedLogEntry> entries = Arrays.asList(
1141                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1142
1143         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1144
1145         followerActorRef.tell(appendEntries, leaderActor);
1146
1147         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1148         assertEquals("isSuccess", true, reply.isSuccess());
1149
1150         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1151
1152         InMemoryJournal.waitForDeleteMessagesComplete(id);
1153         InMemoryJournal.waitForWriteMessagesComplete(id);
1154         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1155         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1156         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1157         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1158         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1159         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1160         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1161
1162         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1163         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1164         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1165         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1166         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1167         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1168                 MockRaftActor.fromState(snapshot.getState()));
1169     }
1170
1171     @Test
1172     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1173         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1174         logStart(id);
1175
1176         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1177
1178         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1179         config.setSnapshotBatchCount(2);
1180         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1181
1182         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1183         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1184         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1185                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1186         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1187                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1188         followerRaftActor.set(followerActorRef.underlyingActor());
1189         followerRaftActor.get().waitForInitializeBehaviorComplete();
1190
1191         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1192         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1193         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1194
1195         List<ReplicatedLogEntry> entries = Arrays.asList(
1196                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1197                 newReplicatedLogEntry(1, 2, "three"));
1198
1199         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1200
1201         followerActorRef.tell(appendEntries, leaderActor);
1202
1203         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1204         assertEquals("isSuccess", true, reply.isSuccess());
1205
1206         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1207
1208         InMemoryJournal.waitForDeleteMessagesComplete(id);
1209         InMemoryJournal.waitForWriteMessagesComplete(id);
1210         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1211         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1212         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1213         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1214         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1215         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1216         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1217
1218         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1219         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1220         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1221         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1222         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1223         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1224                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1225
1226         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1227         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1228
1229         // Reinstate the actor from persistence
1230
1231         actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1232
1233         followerActorRef = actorFactory.createTestActor(builder.props()
1234                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1235         followerRaftActor.set(followerActorRef.underlyingActor());
1236         followerRaftActor.get().waitForInitializeBehaviorComplete();
1237
1238         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1239         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1240         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1241         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1242         assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1243                 entries.get(2).getData()), followerRaftActor.get().getState());
1244     }
1245
1246     @Test
1247     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1248         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1249         logStart(id);
1250
1251         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1252
1253         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1254         config.setSnapshotBatchCount(1);
1255         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1256
1257         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1258         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1259         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1260                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1261         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1262                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1263         followerRaftActor.set(followerActorRef.underlyingActor());
1264         followerRaftActor.get().waitForInitializeBehaviorComplete();
1265
1266         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1267         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1268         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1269
1270         List<ReplicatedLogEntry> entries = Arrays.asList(
1271                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1272                 newReplicatedLogEntry(1, 2, "three"));
1273
1274         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1275
1276         followerActorRef.tell(appendEntries, leaderActor);
1277
1278         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1279         assertEquals("isSuccess", true, reply.isSuccess());
1280
1281         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1282
1283         InMemoryJournal.waitForDeleteMessagesComplete(id);
1284         InMemoryJournal.waitForWriteMessagesComplete(id);
1285         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1286         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1287         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1288         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1289         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1290         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1291         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1292
1293         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1294         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1295         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1296         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1297         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1298         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1299         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1300         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1301                 MockRaftActor.fromState(snapshot.getState()));
1302     }
1303
1304     @SuppressWarnings("checkstyle:IllegalCatch")
1305     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1306             final AtomicReference<MockRaftActor> followerRaftActor) {
1307         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1308             @Override
1309             public void createSnapshot(final ActorRef actorRef,
1310                     final java.util.Optional<OutputStream> installSnapshotStream) {
1311                 try {
1312                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1313                             installSnapshotStream), actorRef);
1314                 } catch (RuntimeException e) {
1315                     throw e;
1316                 } catch (Exception e) {
1317                     throw new RuntimeException(e);
1318                 }
1319             }
1320
1321             @Override
1322             public void applySnapshot(final State snapshotState) {
1323             }
1324
1325             @Override
1326             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1327                 throw new UnsupportedOperationException();
1328             }
1329         };
1330         return snapshotCohort;
1331     }
1332
1333     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1334         int snapshotLength = bs.size();
1335         int start = offset;
1336         int size = chunkSize;
1337         if (chunkSize > snapshotLength) {
1338             size = snapshotLength;
1339         } else {
1340             if (start + chunkSize > snapshotLength) {
1341                 size = snapshotLength - start;
1342             }
1343         }
1344
1345         byte[] nextChunk = new byte[size];
1346         bs.copyTo(nextChunk, start, 0, size);
1347         return nextChunk;
1348     }
1349
1350     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1351             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1352         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1353     }
1354
1355     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1356             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1357             final boolean expForceInstallSnapshot) {
1358
1359         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1360                 AppendEntriesReply.class);
1361
1362         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1363         assertEquals("getTerm", expTerm, reply.getTerm());
1364         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1365         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1366         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1367         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1368         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1369     }
1370
1371
1372     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1373         return new SimpleReplicatedLogEntry(index, term,
1374                 new MockRaftActorContext.MockPayload(data));
1375     }
1376
1377     private ByteString createSnapshot() {
1378         HashMap<String, String> followerSnapshot = new HashMap<>();
1379         followerSnapshot.put("1", "A");
1380         followerSnapshot.put("2", "B");
1381         followerSnapshot.put("3", "C");
1382
1383         return toByteString(followerSnapshot);
1384     }
1385
1386     @Override
1387     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1388             final ActorRef actorRef, final RaftRPC rpc) {
1389         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1390
1391         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1392         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1393     }
1394
1395     @Override
1396     protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1397         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1398         assertEquals("isSuccess", true, reply.isSuccess());
1399     }
1400 }