Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.Props;
24 import akka.dispatch.Dispatchers;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.base.Throwables;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import com.google.protobuf.ByteString;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
48 import org.opendaylight.controller.cluster.raft.RaftActorContext;
49 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
50 import org.opendaylight.controller.cluster.raft.RaftActorTest;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
52 import org.opendaylight.controller.cluster.raft.Snapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
55 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
56 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
59 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
60 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
61 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
65 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
66 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
68 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
70 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
74 import scala.concurrent.duration.FiniteDuration;
75
76 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
77
78     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
79             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
80
81     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
82             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
83
84     private Follower follower;
85
86     private final short payloadVersion = 5;
87
88     @Override
89     @After
90     public void tearDown() throws Exception {
91         if (follower != null) {
92             follower.close();
93         }
94
95         super.tearDown();
96     }
97
98     @Override
99     protected Follower createBehavior(RaftActorContext actorContext) {
100         return spy(new Follower(actorContext));
101     }
102
103     @Override
104     protected  MockRaftActorContext createActorContext() {
105         return createActorContext(followerActor);
106     }
107
108     @Override
109     protected  MockRaftActorContext createActorContext(ActorRef actorRef) {
110         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
111         context.setPayloadVersion(payloadVersion );
112         return context;
113     }
114
115     @Test
116     public void testThatAnElectionTimeoutIsTriggered() {
117         MockRaftActorContext actorContext = createActorContext();
118         follower = new Follower(actorContext);
119
120         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
121                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
122     }
123
124     @Test
125     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
126         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
127
128         MockRaftActorContext context = createActorContext();
129         follower = new Follower(context);
130
131         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
132                 TimeUnit.MILLISECONDS);
133         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
134
135         assertTrue(raftBehavior instanceof Candidate);
136     }
137
138     @Test
139     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
140         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
141
142         MockRaftActorContext context = createActorContext();
143         ((DefaultConfigParamsImpl) context.getConfigParams())
144                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
145         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
146
147         follower = new Follower(context);
148         context.setCurrentBehavior(follower);
149
150         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
151                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
152         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
153                 -1, -1, (short) 1));
154
155         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
156         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
157         assertTrue(raftBehavior instanceof Follower);
158
159         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
160                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
161         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
162                 -1, -1, (short) 1));
163
164         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
165         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
166         assertTrue(raftBehavior instanceof Follower);
167     }
168
169     @Test
170     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
171         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
172
173         MockRaftActorContext context = createActorContext();
174         long term = 1000;
175         context.getTermInformation().update(term, null);
176
177         follower = createBehavior(context);
178
179         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
180
181         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
182
183         assertEquals("isVoteGranted", true, reply.isVoteGranted());
184         assertEquals("getTerm", term, reply.getTerm());
185         verify(follower).scheduleElection(any(FiniteDuration.class));
186     }
187
188     @Test
189     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
190         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
191
192         MockRaftActorContext context = createActorContext();
193         long term = 1000;
194         context.getTermInformation().update(term, "test");
195
196         follower = createBehavior(context);
197
198         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
199
200         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
201
202         assertEquals("isVoteGranted", false, reply.isVoteGranted());
203         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
204     }
205
206
207     @Test
208     public void testHandleFirstAppendEntries() throws Exception {
209         logStart("testHandleFirstAppendEntries");
210
211         MockRaftActorContext context = createActorContext();
212         context.getReplicatedLog().clear(0,2);
213         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
214         context.getReplicatedLog().setSnapshotIndex(99);
215
216         List<ReplicatedLogEntry> entries = Arrays.asList(
217                 newReplicatedLogEntry(2, 101, "foo"));
218
219         Assert.assertEquals(1, context.getReplicatedLog().size());
220
221         // The new commitIndex is 101
222         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
223
224         follower = createBehavior(context);
225         follower.handleMessage(leaderActor, appendEntries);
226
227         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
228                 FollowerInitialSyncUpStatus.class);
229         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
230
231         assertFalse(syncStatus.isInitialSyncDone());
232         assertTrue("append entries reply should be true", reply.isSuccess());
233     }
234
235     @Test
236     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
237         logStart("testHandleFirstAppendEntries");
238
239         MockRaftActorContext context = createActorContext();
240
241         List<ReplicatedLogEntry> entries = Arrays.asList(
242                 newReplicatedLogEntry(2, 101, "foo"));
243
244         // The new commitIndex is 101
245         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
246
247         follower = createBehavior(context);
248         follower.handleMessage(leaderActor, appendEntries);
249
250         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
251                 FollowerInitialSyncUpStatus.class);
252         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
253
254         assertFalse(syncStatus.isInitialSyncDone());
255         assertFalse("append entries reply should be false", reply.isSuccess());
256     }
257
258     @Test
259     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
260             throws Exception {
261         logStart("testHandleFirstAppendEntries");
262
263         MockRaftActorContext context = createActorContext();
264         context.getReplicatedLog().clear(0,2);
265         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
266         context.getReplicatedLog().setSnapshotIndex(99);
267
268         List<ReplicatedLogEntry> entries = Arrays.asList(
269                 newReplicatedLogEntry(2, 101, "foo"));
270
271         // The new commitIndex is 101
272         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
273
274         follower = createBehavior(context);
275         follower.handleMessage(leaderActor, appendEntries);
276
277         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
278                 FollowerInitialSyncUpStatus.class);
279         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
280
281         assertFalse(syncStatus.isInitialSyncDone());
282         assertTrue("append entries reply should be true", reply.isSuccess());
283     }
284
285     @Test
286     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
287             throws Exception {
288         logStart("testHandleFirstAppendEntries");
289
290         MockRaftActorContext context = createActorContext();
291         context.getReplicatedLog().clear(0,2);
292         context.getReplicatedLog().setSnapshotIndex(100);
293
294         List<ReplicatedLogEntry> entries = Arrays.asList(
295                 newReplicatedLogEntry(2, 101, "foo"));
296
297         // The new commitIndex is 101
298         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
299
300         follower = createBehavior(context);
301         follower.handleMessage(leaderActor, appendEntries);
302
303         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
304                 FollowerInitialSyncUpStatus.class);
305         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
306
307         assertFalse(syncStatus.isInitialSyncDone());
308         assertTrue("append entries reply should be true", reply.isSuccess());
309     }
310
311     @Test
312     public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
313             throws Exception {
314         logStart(
315                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
316
317         MockRaftActorContext context = createActorContext();
318         context.getReplicatedLog().clear(0,2);
319         context.getReplicatedLog().setSnapshotIndex(100);
320
321         List<ReplicatedLogEntry> entries = Arrays.asList(
322                 newReplicatedLogEntry(2, 105, "foo"));
323
324         // The new commitIndex is 101
325         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
326
327         follower = createBehavior(context);
328         follower.handleMessage(leaderActor, appendEntries);
329
330         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
331                 FollowerInitialSyncUpStatus.class);
332         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
333
334         assertFalse(syncStatus.isInitialSyncDone());
335         assertFalse("append entries reply should be false", reply.isSuccess());
336     }
337
338     @Test
339     public void testHandleSyncUpAppendEntries() throws Exception {
340         logStart("testHandleSyncUpAppendEntries");
341
342         MockRaftActorContext context = createActorContext();
343
344         List<ReplicatedLogEntry> entries = Arrays.asList(
345                 newReplicatedLogEntry(2, 101, "foo"));
346
347         // The new commitIndex is 101
348         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
349
350         follower = createBehavior(context);
351         follower.handleMessage(leaderActor, appendEntries);
352
353         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
354                 FollowerInitialSyncUpStatus.class);
355
356         assertFalse(syncStatus.isInitialSyncDone());
357
358         // Clear all the messages
359         followerActor.underlyingActor().clear();
360
361         context.setLastApplied(101);
362         context.setCommitIndex(101);
363         setLastLogEntry(context, 1, 101,
364                 new MockRaftActorContext.MockPayload(""));
365
366         entries = Arrays.asList(
367                 newReplicatedLogEntry(2, 101, "foo"));
368
369         // The new commitIndex is 101
370         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
371         follower.handleMessage(leaderActor, appendEntries);
372
373         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
374
375         assertTrue(syncStatus.isInitialSyncDone());
376
377         followerActor.underlyingActor().clear();
378
379         // Sending the same message again should not generate another message
380
381         follower.handleMessage(leaderActor, appendEntries);
382
383         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
384
385         assertNull(syncStatus);
386
387     }
388
389     @Test
390     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
391         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
392
393         MockRaftActorContext context = createActorContext();
394
395         List<ReplicatedLogEntry> entries = Arrays.asList(
396                 newReplicatedLogEntry(2, 101, "foo"));
397
398         // The new commitIndex is 101
399         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
400
401         follower = createBehavior(context);
402         follower.handleMessage(leaderActor, appendEntries);
403
404         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
405                 FollowerInitialSyncUpStatus.class);
406
407         assertFalse(syncStatus.isInitialSyncDone());
408
409         // Clear all the messages
410         followerActor.underlyingActor().clear();
411
412         context.setLastApplied(100);
413         setLastLogEntry(context, 1, 100,
414                 new MockRaftActorContext.MockPayload(""));
415
416         entries = Arrays.asList(
417                 newReplicatedLogEntry(2, 101, "foo"));
418
419         // leader-2 is becoming the leader now and it says the commitIndex is 45
420         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
421         follower.handleMessage(leaderActor, appendEntries);
422
423         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
424
425         // We get a new message saying initial status is not done
426         assertFalse(syncStatus.isInitialSyncDone());
427
428     }
429
430
431     @Test
432     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
433         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
434
435         MockRaftActorContext context = createActorContext();
436
437         List<ReplicatedLogEntry> entries = Arrays.asList(
438                 newReplicatedLogEntry(2, 101, "foo"));
439
440         // The new commitIndex is 101
441         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
442
443         follower = createBehavior(context);
444         follower.handleMessage(leaderActor, appendEntries);
445
446         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
447                 FollowerInitialSyncUpStatus.class);
448
449         assertFalse(syncStatus.isInitialSyncDone());
450
451         // Clear all the messages
452         followerActor.underlyingActor().clear();
453
454         context.setLastApplied(101);
455         context.setCommitIndex(101);
456         setLastLogEntry(context, 1, 101,
457                 new MockRaftActorContext.MockPayload(""));
458
459         entries = Arrays.asList(
460                 newReplicatedLogEntry(2, 101, "foo"));
461
462         // The new commitIndex is 101
463         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
464         follower.handleMessage(leaderActor, appendEntries);
465
466         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
467
468         assertTrue(syncStatus.isInitialSyncDone());
469
470         // Clear all the messages
471         followerActor.underlyingActor().clear();
472
473         context.setLastApplied(100);
474         setLastLogEntry(context, 1, 100,
475                 new MockRaftActorContext.MockPayload(""));
476
477         entries = Arrays.asList(
478                 newReplicatedLogEntry(2, 101, "foo"));
479
480         // leader-2 is becoming the leader now and it says the commitIndex is 45
481         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
482         follower.handleMessage(leaderActor, appendEntries);
483
484         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
485
486         // We get a new message saying initial status is not done
487         assertFalse(syncStatus.isInitialSyncDone());
488
489     }
490
491
492     /**
493      * This test verifies that when an AppendEntries RPC is received by a RaftActor
494      * with a commitIndex that is greater than what has been applied to the
495      * state machine of the RaftActor, the RaftActor applies the state and
496      * sets it current applied state to the commitIndex of the sender.
497      */
498     @Test
499     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
500         logStart("testHandleAppendEntriesWithNewerCommitIndex");
501
502         MockRaftActorContext context = createActorContext();
503
504         context.setLastApplied(100);
505         setLastLogEntry(context, 1, 100,
506                 new MockRaftActorContext.MockPayload(""));
507         context.getReplicatedLog().setSnapshotIndex(99);
508
509         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
510                 newReplicatedLogEntry(2, 101, "foo"));
511
512         // The new commitIndex is 101
513         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
514
515         follower = createBehavior(context);
516         follower.handleMessage(leaderActor, appendEntries);
517
518         assertEquals("getLastApplied", 101L, context.getLastApplied());
519     }
520
521     /**
522      * This test verifies that when an AppendEntries is received a specific prevLogTerm
523      * which does not match the term that is in RaftActors log entry at prevLogIndex
524      * then the RaftActor does not change it's state and it returns a failure.
525      */
526     @Test
527     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
528         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
529
530         MockRaftActorContext context = createActorContext();
531
532         // First set the receivers term to lower number
533         context.getTermInformation().update(95, "test");
534
535         // AppendEntries is now sent with a bigger term
536         // this will set the receivers term to be the same as the sender's term
537         AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
538
539         follower = createBehavior(context);
540
541         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
542
543         Assert.assertSame(follower, newBehavior);
544
545         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
546                 AppendEntriesReply.class);
547
548         assertEquals("isSuccess", false, reply.isSuccess());
549     }
550
551     /**
552      * This test verifies that when a new AppendEntries message is received with
553      * new entries and the logs of the sender and receiver match that the new
554      * entries get added to the log and the log is incremented by the number of
555      * entries received in appendEntries.
556      */
557     @Test
558     public void testHandleAppendEntriesAddNewEntries() {
559         logStart("testHandleAppendEntriesAddNewEntries");
560
561         MockRaftActorContext context = createActorContext();
562
563         // First set the receivers term to lower number
564         context.getTermInformation().update(1, "test");
565
566         // Prepare the receivers log
567         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
568         log.append(newReplicatedLogEntry(1, 0, "zero"));
569         log.append(newReplicatedLogEntry(1, 1, "one"));
570         log.append(newReplicatedLogEntry(1, 2, "two"));
571
572         context.setReplicatedLog(log);
573
574         // Prepare the entries to be sent with AppendEntries
575         List<ReplicatedLogEntry> entries = new ArrayList<>();
576         entries.add(newReplicatedLogEntry(1, 3, "three"));
577         entries.add(newReplicatedLogEntry(1, 4, "four"));
578
579         // Send appendEntries with the same term as was set on the receiver
580         // before the new behavior was created (1 in this case)
581         // This will not work for a Candidate because as soon as a Candidate
582         // is created it increments the term
583         short leaderPayloadVersion = 10;
584         String leaderId = "leader-1";
585         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
586
587         follower = createBehavior(context);
588
589         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
590
591         Assert.assertSame(follower, newBehavior);
592
593         assertEquals("Next index", 5, log.last().getIndex() + 1);
594         assertEquals("Entry 3", entries.get(0), log.get(3));
595         assertEquals("Entry 4", entries.get(1), log.get(4));
596
597         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
598         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
599
600         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
601     }
602
603     /**
604      * This test verifies that when a new AppendEntries message is received with
605      * new entries and the logs of the sender and receiver are out-of-sync that
606      * the log is first corrected by removing the out of sync entries from the
607      * log and then adding in the new entries sent with the AppendEntries message.
608      */
609     @Test
610     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
611         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
612
613         MockRaftActorContext context = createActorContext();
614
615         // First set the receivers term to lower number
616         context.getTermInformation().update(1, "test");
617
618         // Prepare the receivers log
619         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
620         log.append(newReplicatedLogEntry(1, 0, "zero"));
621         log.append(newReplicatedLogEntry(1, 1, "one"));
622         log.append(newReplicatedLogEntry(1, 2, "two"));
623
624         context.setReplicatedLog(log);
625
626         // Prepare the entries to be sent with AppendEntries
627         List<ReplicatedLogEntry> entries = new ArrayList<>();
628         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
629         entries.add(newReplicatedLogEntry(2, 3, "three"));
630
631         // Send appendEntries with the same term as was set on the receiver
632         // before the new behavior was created (1 in this case)
633         // This will not work for a Candidate because as soon as a Candidate
634         // is created it increments the term
635         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
636
637         follower = createBehavior(context);
638
639         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
640
641         Assert.assertSame(follower, newBehavior);
642
643         // The entry at index 2 will be found out-of-sync with the leader
644         // and will be removed
645         // Then the two new entries will be added to the log
646         // Thus making the log to have 4 entries
647         assertEquals("Next index", 4, log.last().getIndex() + 1);
648         //assertEquals("Entry 2", entries.get(0), log.get(2));
649
650         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
651
652         // Check that the entry at index 2 has the new data
653         assertEquals("Entry 2", entries.get(0), log.get(2));
654
655         assertEquals("Entry 3", entries.get(1), log.get(3));
656
657         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
658     }
659
660     @Test
661     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
662         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
663
664         MockRaftActorContext context = createActorContext();
665
666         // First set the receivers term to lower number
667         context.getTermInformation().update(1, "test");
668
669         // Prepare the receivers log
670         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
671         log.append(newReplicatedLogEntry(1, 0, "zero"));
672         log.append(newReplicatedLogEntry(1, 1, "one"));
673         log.append(newReplicatedLogEntry(1, 2, "two"));
674
675         context.setReplicatedLog(log);
676
677         // Prepare the entries to be sent with AppendEntries
678         List<ReplicatedLogEntry> entries = new ArrayList<>();
679         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
680         entries.add(newReplicatedLogEntry(2, 3, "three"));
681
682         // Send appendEntries with the same term as was set on the receiver
683         // before the new behavior was created (1 in this case)
684         // This will not work for a Candidate because as soon as a Candidate
685         // is created it increments the term
686         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
687
688         context.setRaftPolicy(createRaftPolicy(false, true));
689         follower = createBehavior(context);
690
691         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
692
693         Assert.assertSame(follower, newBehavior);
694
695         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
696     }
697
698     @Test
699     public void testHandleAppendEntriesPreviousLogEntryMissing() {
700         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
701
702         final MockRaftActorContext context = createActorContext();
703
704         // Prepare the receivers log
705         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
706         log.append(newReplicatedLogEntry(1, 0, "zero"));
707         log.append(newReplicatedLogEntry(1, 1, "one"));
708         log.append(newReplicatedLogEntry(1, 2, "two"));
709
710         context.setReplicatedLog(log);
711
712         // Prepare the entries to be sent with AppendEntries
713         List<ReplicatedLogEntry> entries = new ArrayList<>();
714         entries.add(newReplicatedLogEntry(1, 4, "four"));
715
716         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
717
718         follower = createBehavior(context);
719
720         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
721
722         Assert.assertSame(follower, newBehavior);
723
724         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
725     }
726
727     @Test
728     public void testHandleAppendEntriesWithExistingLogEntry() {
729         logStart("testHandleAppendEntriesWithExistingLogEntry");
730
731         MockRaftActorContext context = createActorContext();
732
733         context.getTermInformation().update(1, "test");
734
735         // Prepare the receivers log
736         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
737         log.append(newReplicatedLogEntry(1, 0, "zero"));
738         log.append(newReplicatedLogEntry(1, 1, "one"));
739
740         context.setReplicatedLog(log);
741
742         // Send the last entry again.
743         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
744
745         follower = createBehavior(context);
746
747         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
748
749         assertEquals("Next index", 2, log.last().getIndex() + 1);
750         assertEquals("Entry 1", entries.get(0), log.get(1));
751
752         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
753
754         // Send the last entry again and also a new one.
755
756         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
757
758         leaderActor.underlyingActor().clear();
759         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
760
761         assertEquals("Next index", 3, log.last().getIndex() + 1);
762         assertEquals("Entry 1", entries.get(0), log.get(1));
763         assertEquals("Entry 2", entries.get(1), log.get(2));
764
765         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
766     }
767
768     @Test
769     public void testHandleAppendEntriesAfterInstallingSnapshot() {
770         logStart("testHandleAppendAfterInstallingSnapshot");
771
772         MockRaftActorContext context = createActorContext();
773
774         // Prepare the receivers log
775         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
776
777         // Set up a log as if it has been snapshotted
778         log.setSnapshotIndex(3);
779         log.setSnapshotTerm(1);
780
781         context.setReplicatedLog(log);
782
783         // Prepare the entries to be sent with AppendEntries
784         List<ReplicatedLogEntry> entries = new ArrayList<>();
785         entries.add(newReplicatedLogEntry(1, 4, "four"));
786
787         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
788
789         follower = createBehavior(context);
790
791         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
792
793         Assert.assertSame(follower, newBehavior);
794
795         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
796     }
797
798
799     /**
800      * This test verifies that when InstallSnapshot is received by
801      * the follower its applied correctly.
802      */
803     @Test
804     public void testHandleInstallSnapshot() throws Exception {
805         logStart("testHandleInstallSnapshot");
806
807         MockRaftActorContext context = createActorContext();
808         context.getTermInformation().update(1, "leader");
809
810         follower = createBehavior(context);
811
812         ByteString bsSnapshot  = createSnapshot();
813         int offset = 0;
814         int snapshotLength = bsSnapshot.size();
815         int chunkSize = 50;
816         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
817         int lastIncludedIndex = 1;
818         int chunkIndex = 1;
819         InstallSnapshot lastInstallSnapshot = null;
820
821         for (int i = 0; i < totalChunks; i++) {
822             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
823             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
824                     chunkData, chunkIndex, totalChunks);
825             follower.handleMessage(leaderActor, lastInstallSnapshot);
826             offset = offset + 50;
827             lastIncludedIndex++;
828             chunkIndex++;
829         }
830
831         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
832                 ApplySnapshot.class);
833         Snapshot snapshot = applySnapshot.getSnapshot();
834         assertNotNull(lastInstallSnapshot);
835         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
836         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
837                 snapshot.getLastAppliedTerm());
838         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
839                 snapshot.getLastAppliedIndex());
840         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
841         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
842         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
843         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
844         applySnapshot.getCallback().onSuccess();
845
846         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
847                 leaderActor, InstallSnapshotReply.class);
848         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
849
850         chunkIndex = 1;
851         for (InstallSnapshotReply reply: replies) {
852             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
853             assertEquals("getTerm", 1, reply.getTerm());
854             assertEquals("isSuccess", true, reply.isSuccess());
855             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
856         }
857
858         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
859     }
860
861
862     /**
863      * Verify that when an AppendEntries is sent to a follower during a snapshot install
864      * the Follower short-circuits the processing of the AppendEntries message.
865      */
866     @Test
867     public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
868         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
869
870         MockRaftActorContext context = createActorContext();
871
872         follower = createBehavior(context);
873
874         ByteString bsSnapshot  = createSnapshot();
875         int snapshotLength = bsSnapshot.size();
876         int chunkSize = 50;
877         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
878         int lastIncludedIndex = 1;
879
880         // Check that snapshot installation is not in progress
881         assertNull(follower.getSnapshotTracker());
882
883         // Make sure that we have more than 1 chunk to send
884         assertTrue(totalChunks > 1);
885
886         // Send an install snapshot with the first chunk to start the process of installing a snapshot
887         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
888         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
889                 chunkData, 1, totalChunks));
890
891         // Check if snapshot installation is in progress now
892         assertNotNull(follower.getSnapshotTracker());
893
894         // Send an append entry
895         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
896                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
897
898         follower.handleMessage(leaderActor, appendEntries);
899
900         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
901         assertEquals("isSuccess", true, reply.isSuccess());
902         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
903         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
904         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
905
906         assertNotNull(follower.getSnapshotTracker());
907     }
908
909     @Test
910     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
911         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
912
913         MockRaftActorContext context = createActorContext();
914
915         follower = createBehavior(context);
916
917         ByteString bsSnapshot  = createSnapshot();
918         int snapshotLength = bsSnapshot.size();
919         int chunkSize = 50;
920         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
921         int lastIncludedIndex = 1;
922
923         // Check that snapshot installation is not in progress
924         assertNull(follower.getSnapshotTracker());
925
926         // Make sure that we have more than 1 chunk to send
927         assertTrue(totalChunks > 1);
928
929         // Send an install snapshot with the first chunk to start the process of installing a snapshot
930         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
931         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
932                 chunkData, 1, totalChunks));
933
934         // Check if snapshot installation is in progress now
935         assertNotNull(follower.getSnapshotTracker());
936
937         // Send appendEntries with a new term and leader.
938         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
939                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
940
941         follower.handleMessage(leaderActor, appendEntries);
942
943         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
944         assertEquals("isSuccess", true, reply.isSuccess());
945         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
946         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
947         assertEquals("getTerm", 2, reply.getTerm());
948
949         assertNull(follower.getSnapshotTracker());
950     }
951
952     @Test
953     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
954         logStart("testInitialSyncUpWithHandleInstallSnapshot");
955
956         MockRaftActorContext context = createActorContext();
957         context.setCommitIndex(-1);
958
959         follower = createBehavior(context);
960
961         ByteString bsSnapshot  = createSnapshot();
962         int offset = 0;
963         int snapshotLength = bsSnapshot.size();
964         int chunkSize = 50;
965         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
966         int lastIncludedIndex = 1;
967         int chunkIndex = 1;
968         InstallSnapshot lastInstallSnapshot = null;
969
970         for (int i = 0; i < totalChunks; i++) {
971             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
972             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
973                     chunkData, chunkIndex, totalChunks);
974             follower.handleMessage(leaderActor, lastInstallSnapshot);
975             offset = offset + 50;
976             lastIncludedIndex++;
977             chunkIndex++;
978         }
979
980         FollowerInitialSyncUpStatus syncStatus =
981                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
982
983         assertFalse(syncStatus.isInitialSyncDone());
984
985         // Clear all the messages
986         followerActor.underlyingActor().clear();
987
988         context.setLastApplied(101);
989         context.setCommitIndex(101);
990         setLastLogEntry(context, 1, 101,
991                 new MockRaftActorContext.MockPayload(""));
992
993         List<ReplicatedLogEntry> entries = Arrays.asList(
994                 newReplicatedLogEntry(2, 101, "foo"));
995
996         // The new commitIndex is 101
997         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
998         follower.handleMessage(leaderActor, appendEntries);
999
1000         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1001
1002         assertTrue(syncStatus.isInitialSyncDone());
1003     }
1004
1005     @Test
1006     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
1007         logStart("testHandleOutOfSequenceInstallSnapshot");
1008
1009         MockRaftActorContext context = createActorContext();
1010
1011         follower = createBehavior(context);
1012
1013         ByteString bsSnapshot = createSnapshot();
1014
1015         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1016                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1017         follower.handleMessage(leaderActor, installSnapshot);
1018
1019         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1020                 InstallSnapshotReply.class);
1021
1022         assertEquals("isSuccess", false, reply.isSuccess());
1023         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1024         assertEquals("getTerm", 1, reply.getTerm());
1025         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1026
1027         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1028     }
1029
1030     @Test
1031     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1032         MockRaftActorContext context = createActorContext();
1033
1034         Stopwatch stopwatch = Stopwatch.createStarted();
1035
1036         follower = createBehavior(context);
1037
1038         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1039
1040         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1041
1042         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1043
1044         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1045         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1046     }
1047
1048     @Test
1049     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1050         MockRaftActorContext context = createActorContext();
1051         context.setConfigParams(new DefaultConfigParamsImpl() {
1052             @Override
1053             public FiniteDuration getElectionTimeOutInterval() {
1054                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1055             }
1056         });
1057
1058         context.setRaftPolicy(createRaftPolicy(false, false));
1059
1060         follower = createBehavior(context);
1061
1062         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1063         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1064         assertSame("handleMessage result", follower, newBehavior);
1065     }
1066
1067     @Test
1068     public void testFollowerSchedulesElectionIfNonVoting() {
1069         MockRaftActorContext context = createActorContext();
1070         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1071         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1072                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1073         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1074
1075         follower = new Follower(context, "leader", (short)1);
1076
1077         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1078                 ElectionTimeout.class);
1079         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1080         assertSame("handleMessage result", follower, newBehavior);
1081         assertNull("Expected null leaderId", follower.getLeaderId());
1082     }
1083
1084     @Test
1085     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1086         MockRaftActorContext context = createActorContext();
1087         follower = createBehavior(context);
1088         follower.handleMessage(leaderActor, new RaftRPC() {
1089             private static final long serialVersionUID = 1L;
1090
1091             @Override
1092             public long getTerm() {
1093                 return 100;
1094             }
1095         });
1096         verify(follower).scheduleElection(any(FiniteDuration.class));
1097     }
1098
1099     @Test
1100     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1101         MockRaftActorContext context = createActorContext();
1102         follower = createBehavior(context);
1103         follower.handleMessage(leaderActor, "non-raft-rpc");
1104         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1105     }
1106
1107     @Test
1108     public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
1109         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1110         logStart(id);
1111
1112         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1113
1114         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1115         config.setSnapshotBatchCount(2);
1116         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1117
1118         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1119         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1120         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1121                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1122         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1123                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1124         followerRaftActor.set(followerActorRef.underlyingActor());
1125         followerRaftActor.get().waitForInitializeBehaviorComplete();
1126
1127         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1128         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1129
1130         List<ReplicatedLogEntry> entries = Arrays.asList(
1131                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1132
1133         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1134
1135         followerActorRef.tell(appendEntries, leaderActor);
1136
1137         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1138         assertEquals("isSuccess", true, reply.isSuccess());
1139
1140         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1141
1142         InMemoryJournal.waitForDeleteMessagesComplete(id);
1143         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1144         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1145         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1146         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1147         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1148         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1149         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1150
1151         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1152         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1153         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1154         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1155         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1156         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1157                 MockRaftActor.toObject(snapshot.getState()));
1158     }
1159
1160     @Test
1161     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
1162         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1163         logStart(id);
1164
1165         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1166
1167         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1168         config.setSnapshotBatchCount(2);
1169         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1170
1171         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1172         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1173         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1174                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1175         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1176                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1177         followerRaftActor.set(followerActorRef.underlyingActor());
1178         followerRaftActor.get().waitForInitializeBehaviorComplete();
1179
1180         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1181         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1182
1183         List<ReplicatedLogEntry> entries = Arrays.asList(
1184                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1185                 newReplicatedLogEntry(1, 2, "three"));
1186
1187         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1188
1189         followerActorRef.tell(appendEntries, leaderActor);
1190
1191         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1192         assertEquals("isSuccess", true, reply.isSuccess());
1193
1194         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1195
1196         InMemoryJournal.waitForDeleteMessagesComplete(id);
1197         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1198         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1199         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1200         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1201         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1202         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1203         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1204
1205         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1206         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1207         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1208         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1209         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1210         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1211                 entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
1212
1213         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1214         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1215
1216         // Reinstate the actor from persistence
1217
1218         actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
1219
1220         followerActorRef = actorFactory.createTestActor(builder.props()
1221                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1222         followerRaftActor.set(followerActorRef.underlyingActor());
1223         followerRaftActor.get().waitForInitializeBehaviorComplete();
1224
1225         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1226         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1227         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1228         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1229         assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1230                 entries.get(2).getData()), followerRaftActor.get().getState());
1231     }
1232
1233     @Test
1234     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
1235         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1236         logStart(id);
1237
1238         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1239
1240         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1241         config.setSnapshotBatchCount(1);
1242         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1243
1244         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1245         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1246         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1247                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1248         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1249                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1250         followerRaftActor.set(followerActorRef.underlyingActor());
1251         followerRaftActor.get().waitForInitializeBehaviorComplete();
1252
1253         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1254         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1255
1256         List<ReplicatedLogEntry> entries = Arrays.asList(
1257                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1258                 newReplicatedLogEntry(1, 2, "three"));
1259
1260         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1261
1262         followerActorRef.tell(appendEntries, leaderActor);
1263
1264         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1265         assertEquals("isSuccess", true, reply.isSuccess());
1266
1267         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1268
1269         InMemoryJournal.waitForDeleteMessagesComplete(id);
1270         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1271         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1272         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1273         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1274         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1275         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1276         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1277
1278         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1279         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1280         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1281         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1282         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1283         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1284         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1285         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1286                 MockRaftActor.toObject(snapshot.getState()));
1287     }
1288
1289     @SuppressWarnings("checkstyle:IllegalCatch")
1290     private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
1291         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1292             @Override
1293             public void createSnapshot(ActorRef actorRef) {
1294                 try {
1295                     actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
1296                             followerRaftActor.get().getState()).toByteArray()), actorRef);
1297                 } catch (Exception e) {
1298                     Throwables.propagate(e);
1299                 }
1300             }
1301
1302             @Override
1303             public void applySnapshot(byte[] snapshotBytes) {
1304             }
1305         };
1306         return snapshotCohort;
1307     }
1308
1309     public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
1310         int snapshotLength = bs.size();
1311         int start = offset;
1312         int size = chunkSize;
1313         if (chunkSize > snapshotLength) {
1314             size = snapshotLength;
1315         } else {
1316             if (start + chunkSize > snapshotLength) {
1317                 size = snapshotLength - start;
1318             }
1319         }
1320
1321         byte[] nextChunk = new byte[size];
1322         bs.copyTo(nextChunk, start, 0, size);
1323         return nextChunk;
1324     }
1325
1326     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1327             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
1328         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1329     }
1330
1331     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
1332                                                    String expFollowerId, long expLogLastTerm, long expLogLastIndex,
1333                                                    boolean expForceInstallSnapshot) {
1334
1335         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1336                 AppendEntriesReply.class);
1337
1338         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1339         assertEquals("getTerm", expTerm, reply.getTerm());
1340         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1341         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1342         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1343         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1344         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1345     }
1346
1347
1348     private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
1349         return new SimpleReplicatedLogEntry(index, term,
1350                 new MockRaftActorContext.MockPayload(data));
1351     }
1352
1353     private ByteString createSnapshot() {
1354         HashMap<String, String> followerSnapshot = new HashMap<>();
1355         followerSnapshot.put("1", "A");
1356         followerSnapshot.put("2", "B");
1357         followerSnapshot.put("3", "C");
1358
1359         return toByteString(followerSnapshot);
1360     }
1361
1362     @Override
1363     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
1364             ActorRef actorRef, RaftRPC rpc) throws Exception {
1365         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1366
1367         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1368         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1369     }
1370
1371     @Override
1372     protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
1373             throws Exception {
1374         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1375         assertEquals("isSuccess", true, reply.isSuccess());
1376     }
1377 }