Migrate most of CDS to use java.util.Optional
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.spy;
21 import static org.mockito.Mockito.verify;
22
23 import akka.actor.ActorRef;
24 import akka.dispatch.Dispatchers;
25 import akka.protobuf.ByteString;
26 import akka.testkit.TestActorRef;
27 import akka.testkit.javadsl.TestKit;
28 import com.google.common.base.Optional;
29 import com.google.common.base.Stopwatch;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.io.ByteSource;
33 import com.google.common.util.concurrent.Uninterruptibles;
34 import java.io.OutputStream;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Assert;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
48 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
49 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
50 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
51 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
52 import org.opendaylight.controller.cluster.raft.RaftActorContext;
53 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
54 import org.opendaylight.controller.cluster.raft.RaftVersions;
55 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
56 import org.opendaylight.controller.cluster.raft.VotingState;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
59 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
60 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
61 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
65 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
70 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
71 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
72 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
73 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
75 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
76 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
77 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
78 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
79 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
80 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
81 import scala.concurrent.duration.FiniteDuration;
82
83 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
84
85     private final ActorRef followerActor = actorFactory.createActor(
86             MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
87
88     private final ActorRef leaderActor = actorFactory.createActor(
89             MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
90
91     private Follower follower;
92
93     private final short payloadVersion = 5;
94
95     @Override
96     @After
97     public void tearDown() {
98         if (follower != null) {
99             follower.close();
100         }
101
102         super.tearDown();
103     }
104
105     @Override
106     protected Follower createBehavior(final RaftActorContext actorContext) {
107         return spy(new Follower(actorContext));
108     }
109
110     @Override
111     protected  MockRaftActorContext createActorContext() {
112         return createActorContext(followerActor);
113     }
114
115     @Override
116     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
117         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
118         context.setPayloadVersion(payloadVersion);
119         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
120             peerId -> leaderActor.path().toString());
121         return context;
122     }
123
124     @Test
125     public void testThatAnElectionTimeoutIsTriggered() {
126         MockRaftActorContext actorContext = createActorContext();
127         follower = new Follower(actorContext);
128
129         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
130                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
131     }
132
133     @Test
134     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
135         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
136
137         MockRaftActorContext context = createActorContext();
138         follower = new Follower(context);
139
140         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
141                 TimeUnit.MILLISECONDS);
142         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
143
144         assertTrue(raftBehavior instanceof Candidate);
145     }
146
147     @Test
148     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
149         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
150
151         MockRaftActorContext context = createActorContext();
152         ((DefaultConfigParamsImpl) context.getConfigParams())
153                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
154         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
155
156         follower = new Follower(context);
157         context.setCurrentBehavior(follower);
158
159         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
160                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
161         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
162                 -1, -1, (short) 1));
163
164         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
165         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
166         assertTrue(raftBehavior instanceof Follower);
167
168         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
169                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
170         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
171                 -1, -1, (short) 1));
172
173         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
174         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
175         assertTrue(raftBehavior instanceof Follower);
176     }
177
178     @Test
179     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
180         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
181
182         MockRaftActorContext context = createActorContext();
183         long term = 1000;
184         context.getTermInformation().update(term, null);
185
186         follower = createBehavior(context);
187
188         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
189
190         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
191
192         assertEquals("isVoteGranted", true, reply.isVoteGranted());
193         assertEquals("getTerm", term, reply.getTerm());
194         verify(follower).scheduleElection(any(FiniteDuration.class));
195     }
196
197     @Test
198     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
199         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
200
201         MockRaftActorContext context = createActorContext();
202         long term = 1000;
203         context.getTermInformation().update(term, "test");
204
205         follower = createBehavior(context);
206
207         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
208
209         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
210
211         assertEquals("isVoteGranted", false, reply.isVoteGranted());
212         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
213     }
214
215
216     @Test
217     public void testHandleFirstAppendEntries() {
218         logStart("testHandleFirstAppendEntries");
219
220         MockRaftActorContext context = createActorContext();
221         context.getReplicatedLog().clear(0,2);
222         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
223         context.getReplicatedLog().setSnapshotIndex(99);
224
225         List<ReplicatedLogEntry> entries = Arrays.asList(
226                 newReplicatedLogEntry(2, 101, "foo"));
227
228         Assert.assertEquals(1, context.getReplicatedLog().size());
229
230         // The new commitIndex is 101
231         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
232
233         follower = createBehavior(context);
234         follower.handleMessage(leaderActor, appendEntries);
235
236         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
237                 FollowerInitialSyncUpStatus.class);
238         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
239
240         assertFalse(syncStatus.isInitialSyncDone());
241         assertTrue("append entries reply should be true", reply.isSuccess());
242     }
243
244     @Test
245     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
246         logStart("testHandleFirstAppendEntries");
247
248         MockRaftActorContext context = createActorContext();
249
250         List<ReplicatedLogEntry> entries = Arrays.asList(
251                 newReplicatedLogEntry(2, 101, "foo"));
252
253         // The new commitIndex is 101
254         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
255
256         follower = createBehavior(context);
257         follower.handleMessage(leaderActor, appendEntries);
258
259         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
260                 FollowerInitialSyncUpStatus.class);
261         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
262
263         assertFalse(syncStatus.isInitialSyncDone());
264         assertFalse("append entries reply should be false", reply.isSuccess());
265     }
266
267     @Test
268     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
269         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
270
271         MockRaftActorContext context = createActorContext();
272         context.getReplicatedLog().clear(0,2);
273         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
274         context.getReplicatedLog().setSnapshotIndex(99);
275
276         List<ReplicatedLogEntry> entries = Arrays.asList(
277                 newReplicatedLogEntry(2, 101, "foo"));
278
279         // The new commitIndex is 101
280         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
281
282         follower = createBehavior(context);
283         follower.handleMessage(leaderActor, appendEntries);
284
285         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
286                 FollowerInitialSyncUpStatus.class);
287         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
288
289         assertFalse(syncStatus.isInitialSyncDone());
290         assertTrue("append entries reply should be true", reply.isSuccess());
291     }
292
293     @Test
294     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
295         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
296
297         MockRaftActorContext context = createActorContext();
298         context.getReplicatedLog().clear(0,2);
299         context.getReplicatedLog().setSnapshotIndex(100);
300
301         List<ReplicatedLogEntry> entries = Arrays.asList(
302                 newReplicatedLogEntry(2, 101, "foo"));
303
304         // The new commitIndex is 101
305         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
306
307         follower = createBehavior(context);
308         follower.handleMessage(leaderActor, appendEntries);
309
310         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
311                 FollowerInitialSyncUpStatus.class);
312         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
313
314         assertFalse(syncStatus.isInitialSyncDone());
315         assertTrue("append entries reply should be true", reply.isSuccess());
316     }
317
318     @Test
319     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
320         logStart(
321                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
322
323         MockRaftActorContext context = createActorContext();
324         context.getReplicatedLog().clear(0,2);
325         context.getReplicatedLog().setSnapshotIndex(100);
326
327         List<ReplicatedLogEntry> entries = Arrays.asList(
328                 newReplicatedLogEntry(2, 105, "foo"));
329
330         // The new commitIndex is 101
331         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
332
333         follower = createBehavior(context);
334         follower.handleMessage(leaderActor, appendEntries);
335
336         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
337                 FollowerInitialSyncUpStatus.class);
338         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
339
340         assertFalse(syncStatus.isInitialSyncDone());
341         assertFalse("append entries reply should be false", reply.isSuccess());
342     }
343
344     @Test
345     public void testHandleSyncUpAppendEntries() {
346         logStart("testHandleSyncUpAppendEntries");
347
348         MockRaftActorContext context = createActorContext();
349
350         List<ReplicatedLogEntry> entries = Arrays.asList(
351                 newReplicatedLogEntry(2, 101, "foo"));
352
353         // The new commitIndex is 101
354         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
355
356         follower = createBehavior(context);
357         follower.handleMessage(leaderActor, appendEntries);
358
359         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
360                 FollowerInitialSyncUpStatus.class);
361
362         assertFalse(syncStatus.isInitialSyncDone());
363
364         // Clear all the messages
365         MessageCollectorActor.clearMessages(followerActor);
366
367         context.setLastApplied(101);
368         context.setCommitIndex(101);
369         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
370
371         entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
372
373         // The new commitIndex is 101
374         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
375         follower.handleMessage(leaderActor, appendEntries);
376
377         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
378
379         assertTrue(syncStatus.isInitialSyncDone());
380
381         MessageCollectorActor.clearMessages(followerActor);
382
383         // Sending the same message again should not generate another message
384
385         follower.handleMessage(leaderActor, appendEntries);
386
387         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
388
389         assertNull(syncStatus);
390     }
391
392     @Test
393     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
394         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
395
396         MockRaftActorContext context = createActorContext();
397
398         List<ReplicatedLogEntry> entries = Arrays.asList(
399                 newReplicatedLogEntry(2, 101, "foo"));
400
401         // The new commitIndex is 101
402         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
403
404         follower = createBehavior(context);
405         follower.handleMessage(leaderActor, appendEntries);
406
407         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
408                 FollowerInitialSyncUpStatus.class);
409
410         assertFalse(syncStatus.isInitialSyncDone());
411
412         // Clear all the messages
413         MessageCollectorActor.clearMessages(followerActor);
414
415         context.setLastApplied(100);
416         setLastLogEntry(context, 1, 100,
417                 new MockRaftActorContext.MockPayload(""));
418
419         entries = Arrays.asList(
420                 newReplicatedLogEntry(2, 101, "foo"));
421
422         // leader-2 is becoming the leader now and it says the commitIndex is 45
423         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
424         follower.handleMessage(leaderActor, appendEntries);
425
426         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
427
428         // We get a new message saying initial status is not done
429         assertFalse(syncStatus.isInitialSyncDone());
430     }
431
432     @Test
433     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
434         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
435
436         MockRaftActorContext context = createActorContext();
437
438         List<ReplicatedLogEntry> entries = Arrays.asList(
439                 newReplicatedLogEntry(2, 101, "foo"));
440
441         // The new commitIndex is 101
442         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
443
444         follower = createBehavior(context);
445         follower.handleMessage(leaderActor, appendEntries);
446
447         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
448                 FollowerInitialSyncUpStatus.class);
449
450         assertFalse(syncStatus.isInitialSyncDone());
451
452         // Clear all the messages
453         MessageCollectorActor.clearMessages(followerActor);
454
455         context.setLastApplied(101);
456         context.setCommitIndex(101);
457         setLastLogEntry(context, 1, 101,
458                 new MockRaftActorContext.MockPayload(""));
459
460         entries = Arrays.asList(
461                 newReplicatedLogEntry(2, 101, "foo"));
462
463         // The new commitIndex is 101
464         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
465         follower.handleMessage(leaderActor, appendEntries);
466
467         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
468
469         assertTrue(syncStatus.isInitialSyncDone());
470
471         // Clear all the messages
472         MessageCollectorActor.clearMessages(followerActor);
473
474         context.setLastApplied(100);
475         setLastLogEntry(context, 1, 100,
476                 new MockRaftActorContext.MockPayload(""));
477
478         entries = Arrays.asList(
479                 newReplicatedLogEntry(2, 101, "foo"));
480
481         // leader-2 is becoming the leader now and it says the commitIndex is 45
482         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
483         follower.handleMessage(leaderActor, appendEntries);
484
485         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
486
487         // We get a new message saying initial status is not done
488         assertFalse(syncStatus.isInitialSyncDone());
489     }
490
491     /**
492      * This test verifies that when an AppendEntries RPC is received by a RaftActor
493      * with a commitIndex that is greater than what has been applied to the
494      * state machine of the RaftActor, the RaftActor applies the state and
495      * sets it current applied state to the commitIndex of the sender.
496      */
497     @Test
498     public void testHandleAppendEntriesWithNewerCommitIndex() {
499         logStart("testHandleAppendEntriesWithNewerCommitIndex");
500
501         MockRaftActorContext context = createActorContext();
502
503         context.setLastApplied(100);
504         setLastLogEntry(context, 1, 100,
505                 new MockRaftActorContext.MockPayload(""));
506         context.getReplicatedLog().setSnapshotIndex(99);
507
508         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
509                 newReplicatedLogEntry(2, 101, "foo"));
510
511         // The new commitIndex is 101
512         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
513
514         follower = createBehavior(context);
515         follower.handleMessage(leaderActor, appendEntries);
516
517         assertEquals("getLastApplied", 101L, context.getLastApplied());
518     }
519
520     /**
521      * This test verifies that when an AppendEntries is received with a prevLogTerm
522      * which does not match the term that is in RaftActors log entry at prevLogIndex
523      * then the RaftActor does not change it's state and it returns a failure.
524      */
525     @Test
526     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
527         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
528
529         MockRaftActorContext context = createActorContext();
530
531         AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
532
533         follower = createBehavior(context);
534
535         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
536
537         Assert.assertSame(follower, newBehavior);
538
539         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
540                 AppendEntriesReply.class);
541
542         assertEquals("isSuccess", false, reply.isSuccess());
543     }
544
545     @Test
546     public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
547         logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
548
549         MockRaftActorContext context = createActorContext();
550         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
551         context.getReplicatedLog().setSnapshotIndex(4);
552         context.getReplicatedLog().setSnapshotTerm(3);
553
554         AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
555
556         follower = createBehavior(context);
557
558         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
559
560         Assert.assertSame(follower, newBehavior);
561
562         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
563
564         assertEquals("isSuccess", true, reply.isSuccess());
565     }
566
567     /**
568      * This test verifies that when a new AppendEntries message is received with
569      * new entries and the logs of the sender and receiver match that the new
570      * entries get added to the log and the log is incremented by the number of
571      * entries received in appendEntries.
572      */
573     @Test
574     public void testHandleAppendEntriesAddNewEntries() {
575         logStart("testHandleAppendEntriesAddNewEntries");
576
577         MockRaftActorContext context = createActorContext();
578
579         // First set the receivers term to lower number
580         context.getTermInformation().update(1, "test");
581
582         // Prepare the receivers log
583         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
584         log.append(newReplicatedLogEntry(1, 0, "zero"));
585         log.append(newReplicatedLogEntry(1, 1, "one"));
586         log.append(newReplicatedLogEntry(1, 2, "two"));
587
588         context.setReplicatedLog(log);
589
590         // Prepare the entries to be sent with AppendEntries
591         List<ReplicatedLogEntry> entries = new ArrayList<>();
592         entries.add(newReplicatedLogEntry(1, 3, "three"));
593         entries.add(newReplicatedLogEntry(1, 4, "four"));
594
595         // Send appendEntries with the same term as was set on the receiver
596         // before the new behavior was created (1 in this case)
597         // This will not work for a Candidate because as soon as a Candidate
598         // is created it increments the term
599         short leaderPayloadVersion = 10;
600         String leaderId = "leader-1";
601         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
602
603         follower = createBehavior(context);
604
605         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
606
607         Assert.assertSame(follower, newBehavior);
608
609         assertEquals("Next index", 5, log.last().getIndex() + 1);
610         assertEquals("Entry 3", entries.get(0), log.get(3));
611         assertEquals("Entry 4", entries.get(1), log.get(4));
612
613         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
614         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
615
616         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
617     }
618
619     /**
620      * This test verifies that when a new AppendEntries message is received with
621      * new entries and the logs of the sender and receiver are out-of-sync that
622      * the log is first corrected by removing the out of sync entries from the
623      * log and then adding in the new entries sent with the AppendEntries message.
624      */
625     @Test
626     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
627         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
628
629         MockRaftActorContext context = createActorContext();
630
631         // First set the receivers term to lower number
632         context.getTermInformation().update(1, "test");
633
634         // Prepare the receivers log
635         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
636         log.append(newReplicatedLogEntry(1, 0, "zero"));
637         log.append(newReplicatedLogEntry(1, 1, "one"));
638         log.append(newReplicatedLogEntry(1, 2, "two"));
639
640         context.setReplicatedLog(log);
641
642         // Prepare the entries to be sent with AppendEntries
643         List<ReplicatedLogEntry> entries = new ArrayList<>();
644         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
645         entries.add(newReplicatedLogEntry(2, 3, "three"));
646
647         // Send appendEntries with the same term as was set on the receiver
648         // before the new behavior was created (1 in this case)
649         // This will not work for a Candidate because as soon as a Candidate
650         // is created it increments the term
651         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
652
653         follower = createBehavior(context);
654
655         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
656
657         Assert.assertSame(follower, newBehavior);
658
659         // The entry at index 2 will be found out-of-sync with the leader
660         // and will be removed
661         // Then the two new entries will be added to the log
662         // Thus making the log to have 4 entries
663         assertEquals("Next index", 4, log.last().getIndex() + 1);
664         //assertEquals("Entry 2", entries.get(0), log.get(2));
665
666         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
667
668         // Check that the entry at index 2 has the new data
669         assertEquals("Entry 2", entries.get(0), log.get(2));
670
671         assertEquals("Entry 3", entries.get(1), log.get(3));
672
673         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
674     }
675
676     @Test
677     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
678         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
679
680         MockRaftActorContext context = createActorContext();
681
682         // First set the receivers term to lower number
683         context.getTermInformation().update(1, "test");
684
685         // Prepare the receivers log
686         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
687         log.append(newReplicatedLogEntry(1, 0, "zero"));
688         log.append(newReplicatedLogEntry(1, 1, "one"));
689         log.append(newReplicatedLogEntry(1, 2, "two"));
690
691         context.setReplicatedLog(log);
692
693         // Prepare the entries to be sent with AppendEntries
694         List<ReplicatedLogEntry> entries = new ArrayList<>();
695         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
696         entries.add(newReplicatedLogEntry(2, 3, "three"));
697
698         // Send appendEntries with the same term as was set on the receiver
699         // before the new behavior was created (1 in this case)
700         // This will not work for a Candidate because as soon as a Candidate
701         // is created it increments the term
702         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
703
704         context.setRaftPolicy(createRaftPolicy(false, true));
705         follower = createBehavior(context);
706
707         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
708
709         Assert.assertSame(follower, newBehavior);
710
711         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
712     }
713
714     @Test
715     public void testHandleAppendEntriesPreviousLogEntryMissing() {
716         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
717
718         final MockRaftActorContext context = createActorContext();
719
720         // Prepare the receivers log
721         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
722         log.append(newReplicatedLogEntry(1, 0, "zero"));
723         log.append(newReplicatedLogEntry(1, 1, "one"));
724         log.append(newReplicatedLogEntry(1, 2, "two"));
725
726         context.setReplicatedLog(log);
727
728         // Prepare the entries to be sent with AppendEntries
729         List<ReplicatedLogEntry> entries = new ArrayList<>();
730         entries.add(newReplicatedLogEntry(1, 4, "four"));
731
732         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
733
734         follower = createBehavior(context);
735
736         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
737
738         Assert.assertSame(follower, newBehavior);
739
740         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
741     }
742
743     @Test
744     public void testHandleAppendEntriesWithExistingLogEntry() {
745         logStart("testHandleAppendEntriesWithExistingLogEntry");
746
747         MockRaftActorContext context = createActorContext();
748
749         context.getTermInformation().update(1, "test");
750
751         // Prepare the receivers log
752         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
753         log.append(newReplicatedLogEntry(1, 0, "zero"));
754         log.append(newReplicatedLogEntry(1, 1, "one"));
755
756         context.setReplicatedLog(log);
757
758         // Send the last entry again.
759         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
760
761         follower = createBehavior(context);
762
763         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
764
765         assertEquals("Next index", 2, log.last().getIndex() + 1);
766         assertEquals("Entry 1", entries.get(0), log.get(1));
767
768         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
769
770         // Send the last entry again and also a new one.
771
772         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
773
774         MessageCollectorActor.clearMessages(leaderActor);
775         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
776
777         assertEquals("Next index", 3, log.last().getIndex() + 1);
778         assertEquals("Entry 1", entries.get(0), log.get(1));
779         assertEquals("Entry 2", entries.get(1), log.get(2));
780
781         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
782     }
783
784     @Test
785     public void testHandleAppendEntriesAfterInstallingSnapshot() {
786         logStart("testHandleAppendAfterInstallingSnapshot");
787
788         MockRaftActorContext context = createActorContext();
789
790         // Prepare the receivers log
791         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
792
793         // Set up a log as if it has been snapshotted
794         log.setSnapshotIndex(3);
795         log.setSnapshotTerm(1);
796
797         context.setReplicatedLog(log);
798
799         // Prepare the entries to be sent with AppendEntries
800         List<ReplicatedLogEntry> entries = new ArrayList<>();
801         entries.add(newReplicatedLogEntry(1, 4, "four"));
802
803         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
804
805         follower = createBehavior(context);
806
807         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
808
809         Assert.assertSame(follower, newBehavior);
810
811         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
812     }
813
814     /**
815      * This test verifies that when InstallSnapshot is received by
816      * the follower its applied correctly.
817      */
818     @Test
819     public void testHandleInstallSnapshot() {
820         logStart("testHandleInstallSnapshot");
821
822         MockRaftActorContext context = createActorContext();
823         context.getTermInformation().update(1, "leader");
824
825         follower = createBehavior(context);
826
827         ByteString bsSnapshot = createSnapshot();
828         int offset = 0;
829         int snapshotLength = bsSnapshot.size();
830         int chunkSize = 50;
831         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
832         int lastIncludedIndex = 1;
833         int chunkIndex = 1;
834         InstallSnapshot lastInstallSnapshot = null;
835
836         for (int i = 0; i < totalChunks; i++) {
837             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
838             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
839                     chunkData, chunkIndex, totalChunks);
840             follower.handleMessage(leaderActor, lastInstallSnapshot);
841             offset = offset + 50;
842             lastIncludedIndex++;
843             chunkIndex++;
844         }
845
846         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
847                 ApplySnapshot.class);
848         Snapshot snapshot = applySnapshot.getSnapshot();
849         assertNotNull(lastInstallSnapshot);
850         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
851         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
852                 snapshot.getLastAppliedTerm());
853         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
854                 snapshot.getLastAppliedIndex());
855         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
856         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
857         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
858         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
859         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
860         applySnapshot.getCallback().onSuccess();
861
862         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
863                 leaderActor, InstallSnapshotReply.class);
864         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
865
866         chunkIndex = 1;
867         for (InstallSnapshotReply reply: replies) {
868             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
869             assertEquals("getTerm", 1, reply.getTerm());
870             assertEquals("isSuccess", true, reply.isSuccess());
871             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
872         }
873
874         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
875     }
876
877     /**
878      * Verify that when an AppendEntries is sent to a follower during a snapshot install
879      * the Follower short-circuits the processing of the AppendEntries message.
880      */
881     @Test
882     public void testReceivingAppendEntriesDuringInstallSnapshot() {
883         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
884
885         MockRaftActorContext context = createActorContext();
886
887         follower = createBehavior(context);
888
889         ByteString bsSnapshot  = createSnapshot();
890         int snapshotLength = bsSnapshot.size();
891         int chunkSize = 50;
892         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
893         int lastIncludedIndex = 1;
894
895         // Check that snapshot installation is not in progress
896         assertNull(follower.getSnapshotTracker());
897
898         // Make sure that we have more than 1 chunk to send
899         assertTrue(totalChunks > 1);
900
901         // Send an install snapshot with the first chunk to start the process of installing a snapshot
902         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
903         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
904                 chunkData, 1, totalChunks));
905
906         // Check if snapshot installation is in progress now
907         assertNotNull(follower.getSnapshotTracker());
908
909         // Send an append entry
910         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
911                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
912
913         follower.handleMessage(leaderActor, appendEntries);
914
915         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
916         assertEquals("isSuccess", true, reply.isSuccess());
917         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
918         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
919         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
920
921         assertNotNull(follower.getSnapshotTracker());
922     }
923
924     @Test
925     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
926         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
927
928         MockRaftActorContext context = createActorContext();
929
930         follower = createBehavior(context);
931
932         ByteString bsSnapshot  = createSnapshot();
933         int snapshotLength = bsSnapshot.size();
934         int chunkSize = 50;
935         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
936         int lastIncludedIndex = 1;
937
938         // Check that snapshot installation is not in progress
939         assertNull(follower.getSnapshotTracker());
940
941         // Make sure that we have more than 1 chunk to send
942         assertTrue(totalChunks > 1);
943
944         // Send an install snapshot with the first chunk to start the process of installing a snapshot
945         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
946         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
947                 chunkData, 1, totalChunks));
948
949         // Check if snapshot installation is in progress now
950         assertNotNull(follower.getSnapshotTracker());
951
952         // Send appendEntries with a new term and leader.
953         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
954                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
955
956         follower.handleMessage(leaderActor, appendEntries);
957
958         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
959         assertEquals("isSuccess", true, reply.isSuccess());
960         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
961         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
962         assertEquals("getTerm", 2, reply.getTerm());
963
964         assertNull(follower.getSnapshotTracker());
965     }
966
967     @Test
968     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
969         logStart("testInitialSyncUpWithHandleInstallSnapshot");
970
971         MockRaftActorContext context = createActorContext();
972         context.setCommitIndex(-1);
973
974         follower = createBehavior(context);
975
976         ByteString bsSnapshot  = createSnapshot();
977         int offset = 0;
978         int snapshotLength = bsSnapshot.size();
979         int chunkSize = 50;
980         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
981         int lastIncludedIndex = 1;
982         int chunkIndex = 1;
983         InstallSnapshot lastInstallSnapshot = null;
984
985         for (int i = 0; i < totalChunks; i++) {
986             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
987             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
988                     chunkData, chunkIndex, totalChunks);
989             follower.handleMessage(leaderActor, lastInstallSnapshot);
990             offset = offset + 50;
991             lastIncludedIndex++;
992             chunkIndex++;
993         }
994
995         FollowerInitialSyncUpStatus syncStatus =
996                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
997
998         assertFalse(syncStatus.isInitialSyncDone());
999
1000         // Clear all the messages
1001         MessageCollectorActor.clearMessages(followerActor);
1002
1003         context.setLastApplied(101);
1004         context.setCommitIndex(101);
1005         setLastLogEntry(context, 1, 101,
1006                 new MockRaftActorContext.MockPayload(""));
1007
1008         List<ReplicatedLogEntry> entries = Arrays.asList(
1009                 newReplicatedLogEntry(2, 101, "foo"));
1010
1011         // The new commitIndex is 101
1012         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
1013         follower.handleMessage(leaderActor, appendEntries);
1014
1015         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1016
1017         assertTrue(syncStatus.isInitialSyncDone());
1018     }
1019
1020     @Test
1021     public void testHandleOutOfSequenceInstallSnapshot() {
1022         logStart("testHandleOutOfSequenceInstallSnapshot");
1023
1024         MockRaftActorContext context = createActorContext();
1025
1026         follower = createBehavior(context);
1027
1028         ByteString bsSnapshot = createSnapshot();
1029
1030         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1031                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1032         follower.handleMessage(leaderActor, installSnapshot);
1033
1034         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1035                 InstallSnapshotReply.class);
1036
1037         assertEquals("isSuccess", false, reply.isSuccess());
1038         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1039         assertEquals("getTerm", 1, reply.getTerm());
1040         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1041
1042         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1043     }
1044
1045     @Test
1046     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1047         MockRaftActorContext context = createActorContext();
1048
1049         Stopwatch stopwatch = Stopwatch.createStarted();
1050
1051         follower = createBehavior(context);
1052
1053         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1054
1055         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1056
1057         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1058
1059         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1060         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1061     }
1062
1063     @Test
1064     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1065         MockRaftActorContext context = createActorContext();
1066         context.setConfigParams(new DefaultConfigParamsImpl() {
1067             @Override
1068             public FiniteDuration getElectionTimeOutInterval() {
1069                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1070             }
1071         });
1072
1073         context.setRaftPolicy(createRaftPolicy(false, false));
1074
1075         follower = createBehavior(context);
1076
1077         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1078         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1079         assertSame("handleMessage result", follower, newBehavior);
1080     }
1081
1082     @Test
1083     public void testFollowerSchedulesElectionIfNonVoting() {
1084         MockRaftActorContext context = createActorContext();
1085         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1086         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1087                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1088         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1089
1090         follower = new Follower(context, "leader", (short)1);
1091
1092         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1093                 ElectionTimeout.class);
1094         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1095         assertSame("handleMessage result", follower, newBehavior);
1096         assertNull("Expected null leaderId", follower.getLeaderId());
1097     }
1098
1099     @Test
1100     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1101         MockRaftActorContext context = createActorContext();
1102         follower = createBehavior(context);
1103         follower.handleMessage(leaderActor, new RaftRPC() {
1104             private static final long serialVersionUID = 1L;
1105
1106             @Override
1107             public long getTerm() {
1108                 return 100;
1109             }
1110         });
1111         verify(follower).scheduleElection(any(FiniteDuration.class));
1112     }
1113
1114     @Test
1115     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1116         MockRaftActorContext context = createActorContext();
1117         follower = createBehavior(context);
1118         follower.handleMessage(leaderActor, "non-raft-rpc");
1119         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1120     }
1121
1122     @Test
1123     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1124         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1125         logStart(id);
1126
1127         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1128
1129         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1130         config.setSnapshotBatchCount(2);
1131         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1132
1133         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1134         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1135         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1136                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1137         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1138                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1139         followerRaftActor.set(followerActorRef.underlyingActor());
1140         followerRaftActor.get().waitForInitializeBehaviorComplete();
1141
1142         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1143         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1144         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1145
1146         List<ReplicatedLogEntry> entries = Arrays.asList(
1147                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1148
1149         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1150
1151         followerActorRef.tell(appendEntries, leaderActor);
1152
1153         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1154         assertEquals("isSuccess", true, reply.isSuccess());
1155
1156         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1157
1158         InMemoryJournal.waitForDeleteMessagesComplete(id);
1159         InMemoryJournal.waitForWriteMessagesComplete(id);
1160         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1161         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1162         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1163         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1164         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1165         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1166         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1167
1168         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1169         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1170         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1171         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1172         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1173         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1174                 MockRaftActor.fromState(snapshot.getState()));
1175     }
1176
1177     @Test
1178     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1179         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1180         logStart(id);
1181
1182         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1183
1184         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1185         config.setSnapshotBatchCount(2);
1186         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1187
1188         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1189         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1190         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1191                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1192         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1193                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1194         followerRaftActor.set(followerActorRef.underlyingActor());
1195         followerRaftActor.get().waitForInitializeBehaviorComplete();
1196
1197         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1198         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1199         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1200
1201         List<ReplicatedLogEntry> entries = Arrays.asList(
1202                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1203                 newReplicatedLogEntry(1, 2, "three"));
1204
1205         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1206
1207         followerActorRef.tell(appendEntries, leaderActor);
1208
1209         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1210         assertEquals("isSuccess", true, reply.isSuccess());
1211
1212         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1213
1214         InMemoryJournal.waitForDeleteMessagesComplete(id);
1215         InMemoryJournal.waitForWriteMessagesComplete(id);
1216         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1217         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1218         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1219         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1220         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1221         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1222         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1223
1224         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1225         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1226         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1227         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1228         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1229         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1230                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1231
1232         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1233         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1234
1235         // Reinstate the actor from persistence
1236
1237         actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1238
1239         followerActorRef = actorFactory.createTestActor(builder.props()
1240                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1241         followerRaftActor.set(followerActorRef.underlyingActor());
1242         followerRaftActor.get().waitForInitializeBehaviorComplete();
1243
1244         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1245         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1246         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1247         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1248         assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1249                 entries.get(2).getData()), followerRaftActor.get().getState());
1250     }
1251
1252     @Test
1253     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1254         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1255         logStart(id);
1256
1257         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1258
1259         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1260         config.setSnapshotBatchCount(1);
1261         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1262
1263         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1264         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1265         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1266                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1267         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1268                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1269         followerRaftActor.set(followerActorRef.underlyingActor());
1270         followerRaftActor.get().waitForInitializeBehaviorComplete();
1271
1272         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1273         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1274         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1275
1276         List<ReplicatedLogEntry> entries = Arrays.asList(
1277                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1278                 newReplicatedLogEntry(1, 2, "three"));
1279
1280         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1281
1282         followerActorRef.tell(appendEntries, leaderActor);
1283
1284         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1285         assertEquals("isSuccess", true, reply.isSuccess());
1286
1287         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1288
1289         InMemoryJournal.waitForDeleteMessagesComplete(id);
1290         InMemoryJournal.waitForWriteMessagesComplete(id);
1291         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1292         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1293         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1294         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1295         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1296         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1297         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1298
1299         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1300         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1301         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1302         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1303         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1304         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1305         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1306         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1307                 MockRaftActor.fromState(snapshot.getState()));
1308     }
1309
1310     @Test
1311     public void testNeedsLeaderAddress() {
1312         logStart("testNeedsLeaderAddress");
1313
1314         MockRaftActorContext context = createActorContext();
1315         context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1316         context.addToPeers("leader", null, VotingState.VOTING);
1317         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1318
1319         follower = createBehavior(context);
1320
1321         follower.handleMessage(leaderActor,
1322                 new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
1323
1324         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1325         assertTrue(reply.isNeedsLeaderAddress());
1326         MessageCollectorActor.clearMessages(leaderActor);
1327
1328         PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1329         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1330
1331         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
1332                 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1333
1334         reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1335         assertFalse(reply.isNeedsLeaderAddress());
1336
1337         verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1338     }
1339
1340     @SuppressWarnings("checkstyle:IllegalCatch")
1341     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1342             final AtomicReference<MockRaftActor> followerRaftActor) {
1343         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1344             @Override
1345             public void createSnapshot(final ActorRef actorRef,
1346                     final java.util.Optional<OutputStream> installSnapshotStream) {
1347                 try {
1348                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1349                             installSnapshotStream), actorRef);
1350                 } catch (RuntimeException e) {
1351                     throw e;
1352                 } catch (Exception e) {
1353                     throw new RuntimeException(e);
1354                 }
1355             }
1356
1357             @Override
1358             public void applySnapshot(final State snapshotState) {
1359             }
1360
1361             @Override
1362             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1363                 throw new UnsupportedOperationException();
1364             }
1365         };
1366         return snapshotCohort;
1367     }
1368
1369     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1370         int snapshotLength = bs.size();
1371         int start = offset;
1372         int size = chunkSize;
1373         if (chunkSize > snapshotLength) {
1374             size = snapshotLength;
1375         } else {
1376             if (start + chunkSize > snapshotLength) {
1377                 size = snapshotLength - start;
1378             }
1379         }
1380
1381         byte[] nextChunk = new byte[size];
1382         bs.copyTo(nextChunk, start, 0, size);
1383         return nextChunk;
1384     }
1385
1386     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1387             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1388         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1389     }
1390
1391     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1392             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1393             final boolean expForceInstallSnapshot) {
1394
1395         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1396                 AppendEntriesReply.class);
1397
1398         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1399         assertEquals("getTerm", expTerm, reply.getTerm());
1400         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1401         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1402         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1403         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1404         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1405         assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1406     }
1407
1408
1409     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1410         return new SimpleReplicatedLogEntry(index, term,
1411                 new MockRaftActorContext.MockPayload(data));
1412     }
1413
1414     private ByteString createSnapshot() {
1415         HashMap<String, String> followerSnapshot = new HashMap<>();
1416         followerSnapshot.put("1", "A");
1417         followerSnapshot.put("2", "B");
1418         followerSnapshot.put("3", "C");
1419
1420         return toByteString(followerSnapshot);
1421     }
1422
1423     @Override
1424     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1425             final ActorRef actorRef, final RaftRPC rpc) {
1426         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1427
1428         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1429         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1430     }
1431
1432     @Override
1433     protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1434         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1435         assertEquals("isSuccess", true, reply.isSuccess());
1436     }
1437 }