c4d97ea0e56a4f6259e7cb48f2f5eac1577795de
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Stopwatch;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import com.google.protobuf.ByteString;
33 import java.io.OutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
48 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorContext;
50 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
56 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
57 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
59 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
62 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
64 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
65 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
70 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
71 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
72 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.duration.FiniteDuration;
77
78 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
79
80     private final ActorRef followerActor = actorFactory.createActor(
81             MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
82
83     private final ActorRef leaderActor = actorFactory.createActor(
84             MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
85
86     private Follower follower;
87
88     private final short payloadVersion = 5;
89
90     @Override
91     @After
92     public void tearDown() {
93         if (follower != null) {
94             follower.close();
95         }
96
97         super.tearDown();
98     }
99
100     @Override
101     protected Follower createBehavior(final RaftActorContext actorContext) {
102         return spy(new Follower(actorContext));
103     }
104
105     @Override
106     protected  MockRaftActorContext createActorContext() {
107         return createActorContext(followerActor);
108     }
109
110     @Override
111     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
112         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
113         context.setPayloadVersion(payloadVersion);
114         return context;
115     }
116
117     @Test
118     public void testThatAnElectionTimeoutIsTriggered() {
119         MockRaftActorContext actorContext = createActorContext();
120         follower = new Follower(actorContext);
121
122         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
123                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
124     }
125
126     @Test
127     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
128         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
129
130         MockRaftActorContext context = createActorContext();
131         follower = new Follower(context);
132
133         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
134                 TimeUnit.MILLISECONDS);
135         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
136
137         assertTrue(raftBehavior instanceof Candidate);
138     }
139
140     @Test
141     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
142         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
143
144         MockRaftActorContext context = createActorContext();
145         ((DefaultConfigParamsImpl) context.getConfigParams())
146                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
147         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
148
149         follower = new Follower(context);
150         context.setCurrentBehavior(follower);
151
152         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
153                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
154         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
155                 -1, -1, (short) 1));
156
157         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
158         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
159         assertTrue(raftBehavior instanceof Follower);
160
161         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
162                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
163         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
164                 -1, -1, (short) 1));
165
166         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
167         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
168         assertTrue(raftBehavior instanceof Follower);
169     }
170
171     @Test
172     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
173         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
174
175         MockRaftActorContext context = createActorContext();
176         long term = 1000;
177         context.getTermInformation().update(term, null);
178
179         follower = createBehavior(context);
180
181         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
182
183         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
184
185         assertEquals("isVoteGranted", true, reply.isVoteGranted());
186         assertEquals("getTerm", term, reply.getTerm());
187         verify(follower).scheduleElection(any(FiniteDuration.class));
188     }
189
190     @Test
191     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
192         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
193
194         MockRaftActorContext context = createActorContext();
195         long term = 1000;
196         context.getTermInformation().update(term, "test");
197
198         follower = createBehavior(context);
199
200         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
201
202         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
203
204         assertEquals("isVoteGranted", false, reply.isVoteGranted());
205         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
206     }
207
208
209     @Test
210     public void testHandleFirstAppendEntries() {
211         logStart("testHandleFirstAppendEntries");
212
213         MockRaftActorContext context = createActorContext();
214         context.getReplicatedLog().clear(0,2);
215         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
216         context.getReplicatedLog().setSnapshotIndex(99);
217
218         List<ReplicatedLogEntry> entries = Arrays.asList(
219                 newReplicatedLogEntry(2, 101, "foo"));
220
221         Assert.assertEquals(1, context.getReplicatedLog().size());
222
223         // The new commitIndex is 101
224         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
225
226         follower = createBehavior(context);
227         follower.handleMessage(leaderActor, appendEntries);
228
229         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
230                 FollowerInitialSyncUpStatus.class);
231         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
232
233         assertFalse(syncStatus.isInitialSyncDone());
234         assertTrue("append entries reply should be true", reply.isSuccess());
235     }
236
237     @Test
238     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
239         logStart("testHandleFirstAppendEntries");
240
241         MockRaftActorContext context = createActorContext();
242
243         List<ReplicatedLogEntry> entries = Arrays.asList(
244                 newReplicatedLogEntry(2, 101, "foo"));
245
246         // The new commitIndex is 101
247         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
248
249         follower = createBehavior(context);
250         follower.handleMessage(leaderActor, appendEntries);
251
252         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
253                 FollowerInitialSyncUpStatus.class);
254         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
255
256         assertFalse(syncStatus.isInitialSyncDone());
257         assertFalse("append entries reply should be false", reply.isSuccess());
258     }
259
260     @Test
261     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
262         logStart("testHandleFirstAppendEntries");
263
264         MockRaftActorContext context = createActorContext();
265         context.getReplicatedLog().clear(0,2);
266         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
267         context.getReplicatedLog().setSnapshotIndex(99);
268
269         List<ReplicatedLogEntry> entries = Arrays.asList(
270                 newReplicatedLogEntry(2, 101, "foo"));
271
272         // The new commitIndex is 101
273         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
274
275         follower = createBehavior(context);
276         follower.handleMessage(leaderActor, appendEntries);
277
278         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
279                 FollowerInitialSyncUpStatus.class);
280         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
281
282         assertFalse(syncStatus.isInitialSyncDone());
283         assertTrue("append entries reply should be true", reply.isSuccess());
284     }
285
286     @Test
287     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
288         logStart("testHandleFirstAppendEntries");
289
290         MockRaftActorContext context = createActorContext();
291         context.getReplicatedLog().clear(0,2);
292         context.getReplicatedLog().setSnapshotIndex(100);
293
294         List<ReplicatedLogEntry> entries = Arrays.asList(
295                 newReplicatedLogEntry(2, 101, "foo"));
296
297         // The new commitIndex is 101
298         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
299
300         follower = createBehavior(context);
301         follower.handleMessage(leaderActor, appendEntries);
302
303         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
304                 FollowerInitialSyncUpStatus.class);
305         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
306
307         assertFalse(syncStatus.isInitialSyncDone());
308         assertTrue("append entries reply should be true", reply.isSuccess());
309     }
310
311     @Test
312     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
313         logStart(
314                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
315
316         MockRaftActorContext context = createActorContext();
317         context.getReplicatedLog().clear(0,2);
318         context.getReplicatedLog().setSnapshotIndex(100);
319
320         List<ReplicatedLogEntry> entries = Arrays.asList(
321                 newReplicatedLogEntry(2, 105, "foo"));
322
323         // The new commitIndex is 101
324         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
325
326         follower = createBehavior(context);
327         follower.handleMessage(leaderActor, appendEntries);
328
329         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
330                 FollowerInitialSyncUpStatus.class);
331         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
332
333         assertFalse(syncStatus.isInitialSyncDone());
334         assertFalse("append entries reply should be false", reply.isSuccess());
335     }
336
337     @Test
338     public void testHandleSyncUpAppendEntries() {
339         logStart("testHandleSyncUpAppendEntries");
340
341         MockRaftActorContext context = createActorContext();
342
343         List<ReplicatedLogEntry> entries = Arrays.asList(
344                 newReplicatedLogEntry(2, 101, "foo"));
345
346         // The new commitIndex is 101
347         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
348
349         follower = createBehavior(context);
350         follower.handleMessage(leaderActor, appendEntries);
351
352         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
353                 FollowerInitialSyncUpStatus.class);
354
355         assertFalse(syncStatus.isInitialSyncDone());
356
357         // Clear all the messages
358         MessageCollectorActor.clearMessages(followerActor);
359
360         context.setLastApplied(101);
361         context.setCommitIndex(101);
362         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
363
364         entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
365
366         // The new commitIndex is 101
367         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
368         follower.handleMessage(leaderActor, appendEntries);
369
370         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
371
372         assertTrue(syncStatus.isInitialSyncDone());
373
374         MessageCollectorActor.clearMessages(followerActor);
375
376         // Sending the same message again should not generate another message
377
378         follower.handleMessage(leaderActor, appendEntries);
379
380         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
381
382         assertNull(syncStatus);
383     }
384
385     @Test
386     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
387         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
388
389         MockRaftActorContext context = createActorContext();
390
391         List<ReplicatedLogEntry> entries = Arrays.asList(
392                 newReplicatedLogEntry(2, 101, "foo"));
393
394         // The new commitIndex is 101
395         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
396
397         follower = createBehavior(context);
398         follower.handleMessage(leaderActor, appendEntries);
399
400         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
401                 FollowerInitialSyncUpStatus.class);
402
403         assertFalse(syncStatus.isInitialSyncDone());
404
405         // Clear all the messages
406         MessageCollectorActor.clearMessages(followerActor);
407
408         context.setLastApplied(100);
409         setLastLogEntry(context, 1, 100,
410                 new MockRaftActorContext.MockPayload(""));
411
412         entries = Arrays.asList(
413                 newReplicatedLogEntry(2, 101, "foo"));
414
415         // leader-2 is becoming the leader now and it says the commitIndex is 45
416         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
417         follower.handleMessage(leaderActor, appendEntries);
418
419         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
420
421         // We get a new message saying initial status is not done
422         assertFalse(syncStatus.isInitialSyncDone());
423     }
424
425     @Test
426     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
427         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
428
429         MockRaftActorContext context = createActorContext();
430
431         List<ReplicatedLogEntry> entries = Arrays.asList(
432                 newReplicatedLogEntry(2, 101, "foo"));
433
434         // The new commitIndex is 101
435         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
436
437         follower = createBehavior(context);
438         follower.handleMessage(leaderActor, appendEntries);
439
440         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
441                 FollowerInitialSyncUpStatus.class);
442
443         assertFalse(syncStatus.isInitialSyncDone());
444
445         // Clear all the messages
446         MessageCollectorActor.clearMessages(followerActor);
447
448         context.setLastApplied(101);
449         context.setCommitIndex(101);
450         setLastLogEntry(context, 1, 101,
451                 new MockRaftActorContext.MockPayload(""));
452
453         entries = Arrays.asList(
454                 newReplicatedLogEntry(2, 101, "foo"));
455
456         // The new commitIndex is 101
457         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
458         follower.handleMessage(leaderActor, appendEntries);
459
460         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
461
462         assertTrue(syncStatus.isInitialSyncDone());
463
464         // Clear all the messages
465         MessageCollectorActor.clearMessages(followerActor);
466
467         context.setLastApplied(100);
468         setLastLogEntry(context, 1, 100,
469                 new MockRaftActorContext.MockPayload(""));
470
471         entries = Arrays.asList(
472                 newReplicatedLogEntry(2, 101, "foo"));
473
474         // leader-2 is becoming the leader now and it says the commitIndex is 45
475         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
476         follower.handleMessage(leaderActor, appendEntries);
477
478         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
479
480         // We get a new message saying initial status is not done
481         assertFalse(syncStatus.isInitialSyncDone());
482     }
483
484     /**
485      * This test verifies that when an AppendEntries RPC is received by a RaftActor
486      * with a commitIndex that is greater than what has been applied to the
487      * state machine of the RaftActor, the RaftActor applies the state and
488      * sets it current applied state to the commitIndex of the sender.
489      */
490     @Test
491     public void testHandleAppendEntriesWithNewerCommitIndex() {
492         logStart("testHandleAppendEntriesWithNewerCommitIndex");
493
494         MockRaftActorContext context = createActorContext();
495
496         context.setLastApplied(100);
497         setLastLogEntry(context, 1, 100,
498                 new MockRaftActorContext.MockPayload(""));
499         context.getReplicatedLog().setSnapshotIndex(99);
500
501         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
502                 newReplicatedLogEntry(2, 101, "foo"));
503
504         // The new commitIndex is 101
505         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
506
507         follower = createBehavior(context);
508         follower.handleMessage(leaderActor, appendEntries);
509
510         assertEquals("getLastApplied", 101L, context.getLastApplied());
511     }
512
513     /**
514      * This test verifies that when an AppendEntries is received a specific prevLogTerm
515      * which does not match the term that is in RaftActors log entry at prevLogIndex
516      * then the RaftActor does not change it's state and it returns a failure.
517      */
518     @Test
519     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
520         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
521
522         MockRaftActorContext context = createActorContext();
523
524         // First set the receivers term to lower number
525         context.getTermInformation().update(95, "test");
526
527         // AppendEntries is now sent with a bigger term
528         // this will set the receivers term to be the same as the sender's term
529         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
530                 (short)0);
531
532         follower = createBehavior(context);
533
534         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
535
536         Assert.assertSame(follower, newBehavior);
537
538         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
539                 AppendEntriesReply.class);
540
541         assertEquals("isSuccess", false, reply.isSuccess());
542     }
543
544     /**
545      * This test verifies that when a new AppendEntries message is received with
546      * new entries and the logs of the sender and receiver match that the new
547      * entries get added to the log and the log is incremented by the number of
548      * entries received in appendEntries.
549      */
550     @Test
551     public void testHandleAppendEntriesAddNewEntries() {
552         logStart("testHandleAppendEntriesAddNewEntries");
553
554         MockRaftActorContext context = createActorContext();
555
556         // First set the receivers term to lower number
557         context.getTermInformation().update(1, "test");
558
559         // Prepare the receivers log
560         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
561         log.append(newReplicatedLogEntry(1, 0, "zero"));
562         log.append(newReplicatedLogEntry(1, 1, "one"));
563         log.append(newReplicatedLogEntry(1, 2, "two"));
564
565         context.setReplicatedLog(log);
566
567         // Prepare the entries to be sent with AppendEntries
568         List<ReplicatedLogEntry> entries = new ArrayList<>();
569         entries.add(newReplicatedLogEntry(1, 3, "three"));
570         entries.add(newReplicatedLogEntry(1, 4, "four"));
571
572         // Send appendEntries with the same term as was set on the receiver
573         // before the new behavior was created (1 in this case)
574         // This will not work for a Candidate because as soon as a Candidate
575         // is created it increments the term
576         short leaderPayloadVersion = 10;
577         String leaderId = "leader-1";
578         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
579
580         follower = createBehavior(context);
581
582         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
583
584         Assert.assertSame(follower, newBehavior);
585
586         assertEquals("Next index", 5, log.last().getIndex() + 1);
587         assertEquals("Entry 3", entries.get(0), log.get(3));
588         assertEquals("Entry 4", entries.get(1), log.get(4));
589
590         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
591         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
592
593         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
594     }
595
596     /**
597      * This test verifies that when a new AppendEntries message is received with
598      * new entries and the logs of the sender and receiver are out-of-sync that
599      * the log is first corrected by removing the out of sync entries from the
600      * log and then adding in the new entries sent with the AppendEntries message.
601      */
602     @Test
603     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
604         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
605
606         MockRaftActorContext context = createActorContext();
607
608         // First set the receivers term to lower number
609         context.getTermInformation().update(1, "test");
610
611         // Prepare the receivers log
612         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
613         log.append(newReplicatedLogEntry(1, 0, "zero"));
614         log.append(newReplicatedLogEntry(1, 1, "one"));
615         log.append(newReplicatedLogEntry(1, 2, "two"));
616
617         context.setReplicatedLog(log);
618
619         // Prepare the entries to be sent with AppendEntries
620         List<ReplicatedLogEntry> entries = new ArrayList<>();
621         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
622         entries.add(newReplicatedLogEntry(2, 3, "three"));
623
624         // Send appendEntries with the same term as was set on the receiver
625         // before the new behavior was created (1 in this case)
626         // This will not work for a Candidate because as soon as a Candidate
627         // is created it increments the term
628         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
629
630         follower = createBehavior(context);
631
632         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
633
634         Assert.assertSame(follower, newBehavior);
635
636         // The entry at index 2 will be found out-of-sync with the leader
637         // and will be removed
638         // Then the two new entries will be added to the log
639         // Thus making the log to have 4 entries
640         assertEquals("Next index", 4, log.last().getIndex() + 1);
641         //assertEquals("Entry 2", entries.get(0), log.get(2));
642
643         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
644
645         // Check that the entry at index 2 has the new data
646         assertEquals("Entry 2", entries.get(0), log.get(2));
647
648         assertEquals("Entry 3", entries.get(1), log.get(3));
649
650         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
651     }
652
653     @Test
654     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
655         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
656
657         MockRaftActorContext context = createActorContext();
658
659         // First set the receivers term to lower number
660         context.getTermInformation().update(1, "test");
661
662         // Prepare the receivers log
663         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
664         log.append(newReplicatedLogEntry(1, 0, "zero"));
665         log.append(newReplicatedLogEntry(1, 1, "one"));
666         log.append(newReplicatedLogEntry(1, 2, "two"));
667
668         context.setReplicatedLog(log);
669
670         // Prepare the entries to be sent with AppendEntries
671         List<ReplicatedLogEntry> entries = new ArrayList<>();
672         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
673         entries.add(newReplicatedLogEntry(2, 3, "three"));
674
675         // Send appendEntries with the same term as was set on the receiver
676         // before the new behavior was created (1 in this case)
677         // This will not work for a Candidate because as soon as a Candidate
678         // is created it increments the term
679         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
680
681         context.setRaftPolicy(createRaftPolicy(false, true));
682         follower = createBehavior(context);
683
684         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
685
686         Assert.assertSame(follower, newBehavior);
687
688         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
689     }
690
691     @Test
692     public void testHandleAppendEntriesPreviousLogEntryMissing() {
693         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
694
695         final MockRaftActorContext context = createActorContext();
696
697         // Prepare the receivers log
698         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
699         log.append(newReplicatedLogEntry(1, 0, "zero"));
700         log.append(newReplicatedLogEntry(1, 1, "one"));
701         log.append(newReplicatedLogEntry(1, 2, "two"));
702
703         context.setReplicatedLog(log);
704
705         // Prepare the entries to be sent with AppendEntries
706         List<ReplicatedLogEntry> entries = new ArrayList<>();
707         entries.add(newReplicatedLogEntry(1, 4, "four"));
708
709         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
710
711         follower = createBehavior(context);
712
713         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
714
715         Assert.assertSame(follower, newBehavior);
716
717         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
718     }
719
720     @Test
721     public void testHandleAppendEntriesWithExistingLogEntry() {
722         logStart("testHandleAppendEntriesWithExistingLogEntry");
723
724         MockRaftActorContext context = createActorContext();
725
726         context.getTermInformation().update(1, "test");
727
728         // Prepare the receivers log
729         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
730         log.append(newReplicatedLogEntry(1, 0, "zero"));
731         log.append(newReplicatedLogEntry(1, 1, "one"));
732
733         context.setReplicatedLog(log);
734
735         // Send the last entry again.
736         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
737
738         follower = createBehavior(context);
739
740         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
741
742         assertEquals("Next index", 2, log.last().getIndex() + 1);
743         assertEquals("Entry 1", entries.get(0), log.get(1));
744
745         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
746
747         // Send the last entry again and also a new one.
748
749         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
750
751         MessageCollectorActor.clearMessages(leaderActor);
752         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
753
754         assertEquals("Next index", 3, log.last().getIndex() + 1);
755         assertEquals("Entry 1", entries.get(0), log.get(1));
756         assertEquals("Entry 2", entries.get(1), log.get(2));
757
758         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
759     }
760
761     @Test
762     public void testHandleAppendEntriesAfterInstallingSnapshot() {
763         logStart("testHandleAppendAfterInstallingSnapshot");
764
765         MockRaftActorContext context = createActorContext();
766
767         // Prepare the receivers log
768         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
769
770         // Set up a log as if it has been snapshotted
771         log.setSnapshotIndex(3);
772         log.setSnapshotTerm(1);
773
774         context.setReplicatedLog(log);
775
776         // Prepare the entries to be sent with AppendEntries
777         List<ReplicatedLogEntry> entries = new ArrayList<>();
778         entries.add(newReplicatedLogEntry(1, 4, "four"));
779
780         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
781
782         follower = createBehavior(context);
783
784         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
785
786         Assert.assertSame(follower, newBehavior);
787
788         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
789     }
790
791
792     /**
793      * This test verifies that when InstallSnapshot is received by
794      * the follower its applied correctly.
795      */
796     @Test
797     public void testHandleInstallSnapshot() {
798         logStart("testHandleInstallSnapshot");
799
800         MockRaftActorContext context = createActorContext();
801         context.getTermInformation().update(1, "leader");
802
803         follower = createBehavior(context);
804
805         ByteString bsSnapshot = createSnapshot();
806         int offset = 0;
807         int snapshotLength = bsSnapshot.size();
808         int chunkSize = 50;
809         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
810         int lastIncludedIndex = 1;
811         int chunkIndex = 1;
812         InstallSnapshot lastInstallSnapshot = null;
813
814         for (int i = 0; i < totalChunks; i++) {
815             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
816             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
817                     chunkData, chunkIndex, totalChunks);
818             follower.handleMessage(leaderActor, lastInstallSnapshot);
819             offset = offset + 50;
820             lastIncludedIndex++;
821             chunkIndex++;
822         }
823
824         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
825                 ApplySnapshot.class);
826         Snapshot snapshot = applySnapshot.getSnapshot();
827         assertNotNull(lastInstallSnapshot);
828         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
829         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
830                 snapshot.getLastAppliedTerm());
831         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
832                 snapshot.getLastAppliedIndex());
833         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
834         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
835         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
836         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
837         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
838         applySnapshot.getCallback().onSuccess();
839
840         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
841                 leaderActor, InstallSnapshotReply.class);
842         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
843
844         chunkIndex = 1;
845         for (InstallSnapshotReply reply: replies) {
846             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
847             assertEquals("getTerm", 1, reply.getTerm());
848             assertEquals("isSuccess", true, reply.isSuccess());
849             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
850         }
851
852         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
853     }
854
855     /**
856      * Verify that when an AppendEntries is sent to a follower during a snapshot install
857      * the Follower short-circuits the processing of the AppendEntries message.
858      */
859     @Test
860     public void testReceivingAppendEntriesDuringInstallSnapshot() {
861         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
862
863         MockRaftActorContext context = createActorContext();
864
865         follower = createBehavior(context);
866
867         ByteString bsSnapshot  = createSnapshot();
868         int snapshotLength = bsSnapshot.size();
869         int chunkSize = 50;
870         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
871         int lastIncludedIndex = 1;
872
873         // Check that snapshot installation is not in progress
874         assertNull(follower.getSnapshotTracker());
875
876         // Make sure that we have more than 1 chunk to send
877         assertTrue(totalChunks > 1);
878
879         // Send an install snapshot with the first chunk to start the process of installing a snapshot
880         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
881         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
882                 chunkData, 1, totalChunks));
883
884         // Check if snapshot installation is in progress now
885         assertNotNull(follower.getSnapshotTracker());
886
887         // Send an append entry
888         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
889                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
890
891         follower.handleMessage(leaderActor, appendEntries);
892
893         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
894         assertEquals("isSuccess", true, reply.isSuccess());
895         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
896         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
897         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
898
899         assertNotNull(follower.getSnapshotTracker());
900     }
901
902     @Test
903     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
904         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
905
906         MockRaftActorContext context = createActorContext();
907
908         follower = createBehavior(context);
909
910         ByteString bsSnapshot  = createSnapshot();
911         int snapshotLength = bsSnapshot.size();
912         int chunkSize = 50;
913         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
914         int lastIncludedIndex = 1;
915
916         // Check that snapshot installation is not in progress
917         assertNull(follower.getSnapshotTracker());
918
919         // Make sure that we have more than 1 chunk to send
920         assertTrue(totalChunks > 1);
921
922         // Send an install snapshot with the first chunk to start the process of installing a snapshot
923         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
924         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
925                 chunkData, 1, totalChunks));
926
927         // Check if snapshot installation is in progress now
928         assertNotNull(follower.getSnapshotTracker());
929
930         // Send appendEntries with a new term and leader.
931         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
932                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
933
934         follower.handleMessage(leaderActor, appendEntries);
935
936         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
937         assertEquals("isSuccess", true, reply.isSuccess());
938         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
939         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
940         assertEquals("getTerm", 2, reply.getTerm());
941
942         assertNull(follower.getSnapshotTracker());
943     }
944
945     @Test
946     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
947         logStart("testInitialSyncUpWithHandleInstallSnapshot");
948
949         MockRaftActorContext context = createActorContext();
950         context.setCommitIndex(-1);
951
952         follower = createBehavior(context);
953
954         ByteString bsSnapshot  = createSnapshot();
955         int offset = 0;
956         int snapshotLength = bsSnapshot.size();
957         int chunkSize = 50;
958         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
959         int lastIncludedIndex = 1;
960         int chunkIndex = 1;
961         InstallSnapshot lastInstallSnapshot = null;
962
963         for (int i = 0; i < totalChunks; i++) {
964             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
965             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
966                     chunkData, chunkIndex, totalChunks);
967             follower.handleMessage(leaderActor, lastInstallSnapshot);
968             offset = offset + 50;
969             lastIncludedIndex++;
970             chunkIndex++;
971         }
972
973         FollowerInitialSyncUpStatus syncStatus =
974                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
975
976         assertFalse(syncStatus.isInitialSyncDone());
977
978         // Clear all the messages
979         MessageCollectorActor.clearMessages(followerActor);
980
981         context.setLastApplied(101);
982         context.setCommitIndex(101);
983         setLastLogEntry(context, 1, 101,
984                 new MockRaftActorContext.MockPayload(""));
985
986         List<ReplicatedLogEntry> entries = Arrays.asList(
987                 newReplicatedLogEntry(2, 101, "foo"));
988
989         // The new commitIndex is 101
990         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
991         follower.handleMessage(leaderActor, appendEntries);
992
993         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
994
995         assertTrue(syncStatus.isInitialSyncDone());
996     }
997
998     @Test
999     public void testHandleOutOfSequenceInstallSnapshot() {
1000         logStart("testHandleOutOfSequenceInstallSnapshot");
1001
1002         MockRaftActorContext context = createActorContext();
1003
1004         follower = createBehavior(context);
1005
1006         ByteString bsSnapshot = createSnapshot();
1007
1008         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1009                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1010         follower.handleMessage(leaderActor, installSnapshot);
1011
1012         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1013                 InstallSnapshotReply.class);
1014
1015         assertEquals("isSuccess", false, reply.isSuccess());
1016         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1017         assertEquals("getTerm", 1, reply.getTerm());
1018         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1019
1020         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1021     }
1022
1023     @Test
1024     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1025         MockRaftActorContext context = createActorContext();
1026
1027         Stopwatch stopwatch = Stopwatch.createStarted();
1028
1029         follower = createBehavior(context);
1030
1031         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1032
1033         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1034
1035         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1036
1037         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1038         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1039     }
1040
1041     @Test
1042     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1043         MockRaftActorContext context = createActorContext();
1044         context.setConfigParams(new DefaultConfigParamsImpl() {
1045             @Override
1046             public FiniteDuration getElectionTimeOutInterval() {
1047                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1048             }
1049         });
1050
1051         context.setRaftPolicy(createRaftPolicy(false, false));
1052
1053         follower = createBehavior(context);
1054
1055         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1056         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1057         assertSame("handleMessage result", follower, newBehavior);
1058     }
1059
1060     @Test
1061     public void testFollowerSchedulesElectionIfNonVoting() {
1062         MockRaftActorContext context = createActorContext();
1063         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1064         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1065                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1066         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1067
1068         follower = new Follower(context, "leader", (short)1);
1069
1070         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1071                 ElectionTimeout.class);
1072         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1073         assertSame("handleMessage result", follower, newBehavior);
1074         assertNull("Expected null leaderId", follower.getLeaderId());
1075     }
1076
1077     @Test
1078     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1079         MockRaftActorContext context = createActorContext();
1080         follower = createBehavior(context);
1081         follower.handleMessage(leaderActor, new RaftRPC() {
1082             private static final long serialVersionUID = 1L;
1083
1084             @Override
1085             public long getTerm() {
1086                 return 100;
1087             }
1088         });
1089         verify(follower).scheduleElection(any(FiniteDuration.class));
1090     }
1091
1092     @Test
1093     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1094         MockRaftActorContext context = createActorContext();
1095         follower = createBehavior(context);
1096         follower.handleMessage(leaderActor, "non-raft-rpc");
1097         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1098     }
1099
1100     @Test
1101     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1102         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1103         logStart(id);
1104
1105         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1106
1107         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1108         config.setSnapshotBatchCount(2);
1109         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1110
1111         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1112         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1113         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1114                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1115         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1116                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1117         followerRaftActor.set(followerActorRef.underlyingActor());
1118         followerRaftActor.get().waitForInitializeBehaviorComplete();
1119
1120         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1121         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1122         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1123
1124         List<ReplicatedLogEntry> entries = Arrays.asList(
1125                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1126
1127         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1128
1129         followerActorRef.tell(appendEntries, leaderActor);
1130
1131         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1132         assertEquals("isSuccess", true, reply.isSuccess());
1133
1134         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1135
1136         InMemoryJournal.waitForDeleteMessagesComplete(id);
1137         InMemoryJournal.waitForWriteMessagesComplete(id);
1138         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1139         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1140         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1141         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1142         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1143         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1144         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1145
1146         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1147         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1148         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1149         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1150         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1151         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1152                 MockRaftActor.fromState(snapshot.getState()));
1153     }
1154
1155     @Test
1156     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1157         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1158         logStart(id);
1159
1160         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1161
1162         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1163         config.setSnapshotBatchCount(2);
1164         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1165
1166         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1167         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1168         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1169                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1170         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1171                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1172         followerRaftActor.set(followerActorRef.underlyingActor());
1173         followerRaftActor.get().waitForInitializeBehaviorComplete();
1174
1175         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1176         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1177         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1178
1179         List<ReplicatedLogEntry> entries = Arrays.asList(
1180                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1181                 newReplicatedLogEntry(1, 2, "three"));
1182
1183         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1184
1185         followerActorRef.tell(appendEntries, leaderActor);
1186
1187         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1188         assertEquals("isSuccess", true, reply.isSuccess());
1189
1190         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1191
1192         InMemoryJournal.waitForDeleteMessagesComplete(id);
1193         InMemoryJournal.waitForWriteMessagesComplete(id);
1194         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1195         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1196         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1197         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1198         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1199         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1200         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1201
1202         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1203         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1204         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1205         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1206         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1207         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1208                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1209
1210         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1211         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1212
1213         // Reinstate the actor from persistence
1214
1215         actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1216
1217         followerActorRef = actorFactory.createTestActor(builder.props()
1218                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1219         followerRaftActor.set(followerActorRef.underlyingActor());
1220         followerRaftActor.get().waitForInitializeBehaviorComplete();
1221
1222         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1223         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1224         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1225         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1226         assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1227                 entries.get(2).getData()), followerRaftActor.get().getState());
1228     }
1229
1230     @Test
1231     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1232         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1233         logStart(id);
1234
1235         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1236
1237         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1238         config.setSnapshotBatchCount(1);
1239         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1240
1241         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1242         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1243         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1244                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1245         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1246                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1247         followerRaftActor.set(followerActorRef.underlyingActor());
1248         followerRaftActor.get().waitForInitializeBehaviorComplete();
1249
1250         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1251         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1252         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1253
1254         List<ReplicatedLogEntry> entries = Arrays.asList(
1255                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1256                 newReplicatedLogEntry(1, 2, "three"));
1257
1258         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1259
1260         followerActorRef.tell(appendEntries, leaderActor);
1261
1262         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1263         assertEquals("isSuccess", true, reply.isSuccess());
1264
1265         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1266
1267         InMemoryJournal.waitForDeleteMessagesComplete(id);
1268         InMemoryJournal.waitForWriteMessagesComplete(id);
1269         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1270         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1271         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1272         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1273         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1274         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1275         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1276
1277         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1278         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1279         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1280         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1281         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1282         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1283         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1284         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1285                 MockRaftActor.fromState(snapshot.getState()));
1286     }
1287
1288     @SuppressWarnings("checkstyle:IllegalCatch")
1289     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1290             final AtomicReference<MockRaftActor> followerRaftActor) {
1291         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1292             @Override
1293             public void createSnapshot(final ActorRef actorRef,
1294                     final java.util.Optional<OutputStream> installSnapshotStream) {
1295                 try {
1296                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1297                             installSnapshotStream), actorRef);
1298                 } catch (RuntimeException e) {
1299                     throw e;
1300                 } catch (Exception e) {
1301                     throw new RuntimeException(e);
1302                 }
1303             }
1304
1305             @Override
1306             public void applySnapshot(final State snapshotState) {
1307             }
1308
1309             @Override
1310             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1311                 throw new UnsupportedOperationException();
1312             }
1313         };
1314         return snapshotCohort;
1315     }
1316
1317     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1318         int snapshotLength = bs.size();
1319         int start = offset;
1320         int size = chunkSize;
1321         if (chunkSize > snapshotLength) {
1322             size = snapshotLength;
1323         } else {
1324             if (start + chunkSize > snapshotLength) {
1325                 size = snapshotLength - start;
1326             }
1327         }
1328
1329         byte[] nextChunk = new byte[size];
1330         bs.copyTo(nextChunk, start, 0, size);
1331         return nextChunk;
1332     }
1333
1334     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1335             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1336         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1337     }
1338
1339     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1340             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1341             final boolean expForceInstallSnapshot) {
1342
1343         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1344                 AppendEntriesReply.class);
1345
1346         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1347         assertEquals("getTerm", expTerm, reply.getTerm());
1348         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1349         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1350         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1351         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1352         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1353     }
1354
1355
1356     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1357         return new SimpleReplicatedLogEntry(index, term,
1358                 new MockRaftActorContext.MockPayload(data));
1359     }
1360
1361     private ByteString createSnapshot() {
1362         HashMap<String, String> followerSnapshot = new HashMap<>();
1363         followerSnapshot.put("1", "A");
1364         followerSnapshot.put("2", "B");
1365         followerSnapshot.put("3", "C");
1366
1367         return toByteString(followerSnapshot);
1368     }
1369
1370     @Override
1371     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1372             final ActorRef actorRef, final RaftRPC rpc) {
1373         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1374
1375         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1376         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1377     }
1378
1379     @Override
1380     protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1381         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1382         assertEquals("isSuccess", true, reply.isSuccess());
1383     }
1384 }