Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.spy;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.dispatch.Dispatchers;
24 import akka.protobuf.ByteString;
25 import akka.testkit.TestActorRef;
26 import akka.testkit.javadsl.TestKit;
27 import com.google.common.base.Stopwatch;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.io.ByteSource;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.OutputStream;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Assert;
43 import org.junit.Test;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.MockRaftActor;
46 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
47 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
48 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
49 import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
50 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
51 import org.opendaylight.controller.cluster.raft.RaftActorContext;
52 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
53 import org.opendaylight.controller.cluster.raft.RaftVersions;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.VotingState;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
58 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
59 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
60 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
64 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
66 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
68 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
69 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
70 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
71 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
72 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
73 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
74 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
75 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
76 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
77 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
78 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
83
84     private final ActorRef followerActor = actorFactory.createActor(
85             MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
86
87     private final ActorRef leaderActor = actorFactory.createActor(
88             MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
89
90     private Follower follower;
91
92     private final short payloadVersion = 5;
93
94     @Override
95     @After
96     public void tearDown() {
97         if (follower != null) {
98             follower.close();
99         }
100
101         super.tearDown();
102     }
103
104     @Override
105     protected Follower createBehavior(final RaftActorContext actorContext) {
106         return spy(new Follower(actorContext));
107     }
108
109     @Override
110     protected  MockRaftActorContext createActorContext() {
111         return createActorContext(followerActor);
112     }
113
114     @Override
115     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
116         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
117         context.setPayloadVersion(payloadVersion);
118         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
119             peerId -> leaderActor.path().toString());
120         return context;
121     }
122
123     @Test
124     public void testThatAnElectionTimeoutIsTriggered() {
125         MockRaftActorContext actorContext = createActorContext();
126         follower = new Follower(actorContext);
127
128         MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
129                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
130     }
131
132     @Test
133     public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
134         logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
135
136         MockRaftActorContext context = createActorContext();
137         follower = new Follower(context);
138
139         Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
140                 TimeUnit.MILLISECONDS);
141         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
142
143         assertTrue(raftBehavior instanceof Candidate);
144     }
145
146     @Test
147     public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
148         logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
149
150         MockRaftActorContext context = createActorContext();
151         ((DefaultConfigParamsImpl) context.getConfigParams())
152                 .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
153         ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
154
155         follower = new Follower(context);
156         context.setCurrentBehavior(follower);
157
158         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
159                 .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
160         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
161                 -1, -1, (short) 1));
162
163         Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
164         RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
165         assertTrue(raftBehavior instanceof Follower);
166
167         Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
168                 .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
169         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
170                 -1, -1, (short) 1));
171
172         Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
173         raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
174         assertTrue(raftBehavior instanceof Follower);
175     }
176
177     @Test
178     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
179         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
180
181         MockRaftActorContext context = createActorContext();
182         long term = 1000;
183         context.getTermInformation().update(term, null);
184
185         follower = createBehavior(context);
186
187         follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
188
189         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
190
191         assertEquals("isVoteGranted", true, reply.isVoteGranted());
192         assertEquals("getTerm", term, reply.getTerm());
193         verify(follower).scheduleElection(any(FiniteDuration.class));
194     }
195
196     @Test
197     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
198         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
199
200         MockRaftActorContext context = createActorContext();
201         long term = 1000;
202         context.getTermInformation().update(term, "test");
203
204         follower = createBehavior(context);
205
206         follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
207
208         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
209
210         assertEquals("isVoteGranted", false, reply.isVoteGranted());
211         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
212     }
213
214
215     @Test
216     public void testHandleFirstAppendEntries() {
217         logStart("testHandleFirstAppendEntries");
218
219         MockRaftActorContext context = createActorContext();
220         context.getReplicatedLog().clear(0,2);
221         context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
222         context.getReplicatedLog().setSnapshotIndex(99);
223
224         List<ReplicatedLogEntry> entries = Arrays.asList(
225                 newReplicatedLogEntry(2, 101, "foo"));
226
227         Assert.assertEquals(1, context.getReplicatedLog().size());
228
229         // The new commitIndex is 101
230         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
231
232         follower = createBehavior(context);
233         follower.handleMessage(leaderActor, appendEntries);
234
235         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
236                 FollowerInitialSyncUpStatus.class);
237         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
238
239         assertFalse(syncStatus.isInitialSyncDone());
240         assertTrue("append entries reply should be true", reply.isSuccess());
241     }
242
243     @Test
244     public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
245         logStart("testHandleFirstAppendEntries");
246
247         MockRaftActorContext context = createActorContext();
248
249         List<ReplicatedLogEntry> entries = Arrays.asList(
250                 newReplicatedLogEntry(2, 101, "foo"));
251
252         // The new commitIndex is 101
253         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
254
255         follower = createBehavior(context);
256         follower.handleMessage(leaderActor, appendEntries);
257
258         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
259                 FollowerInitialSyncUpStatus.class);
260         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
261
262         assertFalse(syncStatus.isInitialSyncDone());
263         assertFalse("append entries reply should be false", reply.isSuccess());
264     }
265
266     @Test
267     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
268         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
269
270         MockRaftActorContext context = createActorContext();
271         context.getReplicatedLog().clear(0,2);
272         context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
273         context.getReplicatedLog().setSnapshotIndex(99);
274
275         List<ReplicatedLogEntry> entries = Arrays.asList(
276                 newReplicatedLogEntry(2, 101, "foo"));
277
278         // The new commitIndex is 101
279         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
280
281         follower = createBehavior(context);
282         follower.handleMessage(leaderActor, appendEntries);
283
284         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
285                 FollowerInitialSyncUpStatus.class);
286         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
287
288         assertFalse(syncStatus.isInitialSyncDone());
289         assertTrue("append entries reply should be true", reply.isSuccess());
290     }
291
292     @Test
293     public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
294         logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
295
296         MockRaftActorContext context = createActorContext();
297         context.getReplicatedLog().clear(0,2);
298         context.getReplicatedLog().setSnapshotIndex(100);
299
300         List<ReplicatedLogEntry> entries = Arrays.asList(
301                 newReplicatedLogEntry(2, 101, "foo"));
302
303         // The new commitIndex is 101
304         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
305
306         follower = createBehavior(context);
307         follower.handleMessage(leaderActor, appendEntries);
308
309         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
310                 FollowerInitialSyncUpStatus.class);
311         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
312
313         assertFalse(syncStatus.isInitialSyncDone());
314         assertTrue("append entries reply should be true", reply.isSuccess());
315     }
316
317     @Test
318     public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
319         logStart(
320                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
321
322         MockRaftActorContext context = createActorContext();
323         context.getReplicatedLog().clear(0,2);
324         context.getReplicatedLog().setSnapshotIndex(100);
325
326         List<ReplicatedLogEntry> entries = Arrays.asList(
327                 newReplicatedLogEntry(2, 105, "foo"));
328
329         // The new commitIndex is 101
330         AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
331
332         follower = createBehavior(context);
333         follower.handleMessage(leaderActor, appendEntries);
334
335         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
336                 FollowerInitialSyncUpStatus.class);
337         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
338
339         assertFalse(syncStatus.isInitialSyncDone());
340         assertFalse("append entries reply should be false", reply.isSuccess());
341     }
342
343     @Test
344     public void testHandleSyncUpAppendEntries() {
345         logStart("testHandleSyncUpAppendEntries");
346
347         MockRaftActorContext context = createActorContext();
348
349         List<ReplicatedLogEntry> entries = Arrays.asList(
350                 newReplicatedLogEntry(2, 101, "foo"));
351
352         // The new commitIndex is 101
353         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
354
355         follower = createBehavior(context);
356         follower.handleMessage(leaderActor, appendEntries);
357
358         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
359                 FollowerInitialSyncUpStatus.class);
360
361         assertFalse(syncStatus.isInitialSyncDone());
362
363         // Clear all the messages
364         MessageCollectorActor.clearMessages(followerActor);
365
366         context.setLastApplied(101);
367         context.setCommitIndex(101);
368         setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
369
370         entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
371
372         // The new commitIndex is 101
373         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
374         follower.handleMessage(leaderActor, appendEntries);
375
376         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
377
378         assertTrue(syncStatus.isInitialSyncDone());
379
380         MessageCollectorActor.clearMessages(followerActor);
381
382         // Sending the same message again should not generate another message
383
384         follower.handleMessage(leaderActor, appendEntries);
385
386         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
387
388         assertNull(syncStatus);
389     }
390
391     @Test
392     public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
393         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
394
395         MockRaftActorContext context = createActorContext();
396
397         List<ReplicatedLogEntry> entries = Arrays.asList(
398                 newReplicatedLogEntry(2, 101, "foo"));
399
400         // The new commitIndex is 101
401         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
402
403         follower = createBehavior(context);
404         follower.handleMessage(leaderActor, appendEntries);
405
406         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
407                 FollowerInitialSyncUpStatus.class);
408
409         assertFalse(syncStatus.isInitialSyncDone());
410
411         // Clear all the messages
412         MessageCollectorActor.clearMessages(followerActor);
413
414         context.setLastApplied(100);
415         setLastLogEntry(context, 1, 100,
416                 new MockRaftActorContext.MockPayload(""));
417
418         entries = Arrays.asList(
419                 newReplicatedLogEntry(2, 101, "foo"));
420
421         // leader-2 is becoming the leader now and it says the commitIndex is 45
422         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
423         follower.handleMessage(leaderActor, appendEntries);
424
425         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
426
427         // We get a new message saying initial status is not done
428         assertFalse(syncStatus.isInitialSyncDone());
429     }
430
431     @Test
432     public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
433         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
434
435         MockRaftActorContext context = createActorContext();
436
437         List<ReplicatedLogEntry> entries = Arrays.asList(
438                 newReplicatedLogEntry(2, 101, "foo"));
439
440         // The new commitIndex is 101
441         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
442
443         follower = createBehavior(context);
444         follower.handleMessage(leaderActor, appendEntries);
445
446         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
447                 FollowerInitialSyncUpStatus.class);
448
449         assertFalse(syncStatus.isInitialSyncDone());
450
451         // Clear all the messages
452         MessageCollectorActor.clearMessages(followerActor);
453
454         context.setLastApplied(101);
455         context.setCommitIndex(101);
456         setLastLogEntry(context, 1, 101,
457                 new MockRaftActorContext.MockPayload(""));
458
459         entries = Arrays.asList(
460                 newReplicatedLogEntry(2, 101, "foo"));
461
462         // The new commitIndex is 101
463         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
464         follower.handleMessage(leaderActor, appendEntries);
465
466         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
467
468         assertTrue(syncStatus.isInitialSyncDone());
469
470         // Clear all the messages
471         MessageCollectorActor.clearMessages(followerActor);
472
473         context.setLastApplied(100);
474         setLastLogEntry(context, 1, 100,
475                 new MockRaftActorContext.MockPayload(""));
476
477         entries = Arrays.asList(
478                 newReplicatedLogEntry(2, 101, "foo"));
479
480         // leader-2 is becoming the leader now and it says the commitIndex is 45
481         appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
482         follower.handleMessage(leaderActor, appendEntries);
483
484         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
485
486         // We get a new message saying initial status is not done
487         assertFalse(syncStatus.isInitialSyncDone());
488     }
489
490     /**
491      * This test verifies that when an AppendEntries RPC is received by a RaftActor
492      * with a commitIndex that is greater than what has been applied to the
493      * state machine of the RaftActor, the RaftActor applies the state and
494      * sets it current applied state to the commitIndex of the sender.
495      */
496     @Test
497     public void testHandleAppendEntriesWithNewerCommitIndex() {
498         logStart("testHandleAppendEntriesWithNewerCommitIndex");
499
500         MockRaftActorContext context = createActorContext();
501
502         context.setLastApplied(100);
503         setLastLogEntry(context, 1, 100,
504                 new MockRaftActorContext.MockPayload(""));
505         context.getReplicatedLog().setSnapshotIndex(99);
506
507         List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
508                 newReplicatedLogEntry(2, 101, "foo"));
509
510         // The new commitIndex is 101
511         AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
512
513         follower = createBehavior(context);
514         follower.handleMessage(leaderActor, appendEntries);
515
516         assertEquals("getLastApplied", 101L, context.getLastApplied());
517     }
518
519     /**
520      * This test verifies that when an AppendEntries is received with a prevLogTerm
521      * which does not match the term that is in RaftActors log entry at prevLogIndex
522      * then the RaftActor does not change it's state and it returns a failure.
523      */
524     @Test
525     public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
526         logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
527
528         MockRaftActorContext context = createActorContext();
529
530         AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
531
532         follower = createBehavior(context);
533
534         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
535
536         Assert.assertSame(follower, newBehavior);
537
538         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
539                 AppendEntriesReply.class);
540
541         assertEquals("isSuccess", false, reply.isSuccess());
542     }
543
544     @Test
545     public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
546         logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
547
548         MockRaftActorContext context = createActorContext();
549         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
550         context.getReplicatedLog().setSnapshotIndex(4);
551         context.getReplicatedLog().setSnapshotTerm(3);
552
553         AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
554
555         follower = createBehavior(context);
556
557         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
558
559         Assert.assertSame(follower, newBehavior);
560
561         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
562
563         assertEquals("isSuccess", true, reply.isSuccess());
564     }
565
566     /**
567      * This test verifies that when a new AppendEntries message is received with
568      * new entries and the logs of the sender and receiver match that the new
569      * entries get added to the log and the log is incremented by the number of
570      * entries received in appendEntries.
571      */
572     @Test
573     public void testHandleAppendEntriesAddNewEntries() {
574         logStart("testHandleAppendEntriesAddNewEntries");
575
576         MockRaftActorContext context = createActorContext();
577
578         // First set the receivers term to lower number
579         context.getTermInformation().update(1, "test");
580
581         // Prepare the receivers log
582         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
583         log.append(newReplicatedLogEntry(1, 0, "zero"));
584         log.append(newReplicatedLogEntry(1, 1, "one"));
585         log.append(newReplicatedLogEntry(1, 2, "two"));
586
587         context.setReplicatedLog(log);
588
589         // Prepare the entries to be sent with AppendEntries
590         List<ReplicatedLogEntry> entries = new ArrayList<>();
591         entries.add(newReplicatedLogEntry(1, 3, "three"));
592         entries.add(newReplicatedLogEntry(1, 4, "four"));
593
594         // Send appendEntries with the same term as was set on the receiver
595         // before the new behavior was created (1 in this case)
596         // This will not work for a Candidate because as soon as a Candidate
597         // is created it increments the term
598         short leaderPayloadVersion = 10;
599         String leaderId = "leader-1";
600         AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
601
602         follower = createBehavior(context);
603
604         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
605
606         Assert.assertSame(follower, newBehavior);
607
608         assertEquals("Next index", 5, log.last().getIndex() + 1);
609         assertEquals("Entry 3", entries.get(0), log.get(3));
610         assertEquals("Entry 4", entries.get(1), log.get(4));
611
612         assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
613         assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
614
615         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
616     }
617
618     /**
619      * This test verifies that when a new AppendEntries message is received with
620      * new entries and the logs of the sender and receiver are out-of-sync that
621      * the log is first corrected by removing the out of sync entries from the
622      * log and then adding in the new entries sent with the AppendEntries message.
623      */
624     @Test
625     public void testHandleAppendEntriesCorrectReceiverLogEntries() {
626         logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
627
628         MockRaftActorContext context = createActorContext();
629
630         // First set the receivers term to lower number
631         context.getTermInformation().update(1, "test");
632
633         // Prepare the receivers log
634         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
635         log.append(newReplicatedLogEntry(1, 0, "zero"));
636         log.append(newReplicatedLogEntry(1, 1, "one"));
637         log.append(newReplicatedLogEntry(1, 2, "two"));
638
639         context.setReplicatedLog(log);
640
641         // Prepare the entries to be sent with AppendEntries
642         List<ReplicatedLogEntry> entries = new ArrayList<>();
643         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
644         entries.add(newReplicatedLogEntry(2, 3, "three"));
645
646         // Send appendEntries with the same term as was set on the receiver
647         // before the new behavior was created (1 in this case)
648         // This will not work for a Candidate because as soon as a Candidate
649         // is created it increments the term
650         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
651
652         follower = createBehavior(context);
653
654         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
655
656         Assert.assertSame(follower, newBehavior);
657
658         // The entry at index 2 will be found out-of-sync with the leader
659         // and will be removed
660         // Then the two new entries will be added to the log
661         // Thus making the log to have 4 entries
662         assertEquals("Next index", 4, log.last().getIndex() + 1);
663         //assertEquals("Entry 2", entries.get(0), log.get(2));
664
665         assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
666
667         // Check that the entry at index 2 has the new data
668         assertEquals("Entry 2", entries.get(0), log.get(2));
669
670         assertEquals("Entry 3", entries.get(1), log.get(3));
671
672         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
673     }
674
675     @Test
676     public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
677         logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
678
679         MockRaftActorContext context = createActorContext();
680
681         // First set the receivers term to lower number
682         context.getTermInformation().update(1, "test");
683
684         // Prepare the receivers log
685         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
686         log.append(newReplicatedLogEntry(1, 0, "zero"));
687         log.append(newReplicatedLogEntry(1, 1, "one"));
688         log.append(newReplicatedLogEntry(1, 2, "two"));
689
690         context.setReplicatedLog(log);
691
692         // Prepare the entries to be sent with AppendEntries
693         List<ReplicatedLogEntry> entries = new ArrayList<>();
694         entries.add(newReplicatedLogEntry(2, 2, "two-1"));
695         entries.add(newReplicatedLogEntry(2, 3, "three"));
696
697         // Send appendEntries with the same term as was set on the receiver
698         // before the new behavior was created (1 in this case)
699         // This will not work for a Candidate because as soon as a Candidate
700         // is created it increments the term
701         AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
702
703         context.setRaftPolicy(createRaftPolicy(false, true));
704         follower = createBehavior(context);
705
706         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
707
708         Assert.assertSame(follower, newBehavior);
709
710         expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
711     }
712
713     @Test
714     public void testHandleAppendEntriesPreviousLogEntryMissing() {
715         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
716
717         final MockRaftActorContext context = createActorContext();
718
719         // Prepare the receivers log
720         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
721         log.append(newReplicatedLogEntry(1, 0, "zero"));
722         log.append(newReplicatedLogEntry(1, 1, "one"));
723         log.append(newReplicatedLogEntry(1, 2, "two"));
724
725         context.setReplicatedLog(log);
726
727         // Prepare the entries to be sent with AppendEntries
728         List<ReplicatedLogEntry> entries = new ArrayList<>();
729         entries.add(newReplicatedLogEntry(1, 4, "four"));
730
731         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
732
733         follower = createBehavior(context);
734
735         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
736
737         Assert.assertSame(follower, newBehavior);
738
739         expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
740     }
741
742     @Test
743     public void testHandleAppendEntriesWithExistingLogEntry() {
744         logStart("testHandleAppendEntriesWithExistingLogEntry");
745
746         MockRaftActorContext context = createActorContext();
747
748         context.getTermInformation().update(1, "test");
749
750         // Prepare the receivers log
751         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
752         log.append(newReplicatedLogEntry(1, 0, "zero"));
753         log.append(newReplicatedLogEntry(1, 1, "one"));
754
755         context.setReplicatedLog(log);
756
757         // Send the last entry again.
758         List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
759
760         follower = createBehavior(context);
761
762         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
763
764         assertEquals("Next index", 2, log.last().getIndex() + 1);
765         assertEquals("Entry 1", entries.get(0), log.get(1));
766
767         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
768
769         // Send the last entry again and also a new one.
770
771         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
772
773         MessageCollectorActor.clearMessages(leaderActor);
774         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
775
776         assertEquals("Next index", 3, log.last().getIndex() + 1);
777         assertEquals("Entry 1", entries.get(0), log.get(1));
778         assertEquals("Entry 2", entries.get(1), log.get(2));
779
780         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
781     }
782
783     @Test
784     public void testHandleAppendEntriesAfterInstallingSnapshot() {
785         logStart("testHandleAppendAfterInstallingSnapshot");
786
787         MockRaftActorContext context = createActorContext();
788
789         // Prepare the receivers log
790         MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
791
792         // Set up a log as if it has been snapshotted
793         log.setSnapshotIndex(3);
794         log.setSnapshotTerm(1);
795
796         context.setReplicatedLog(log);
797
798         // Prepare the entries to be sent with AppendEntries
799         List<ReplicatedLogEntry> entries = new ArrayList<>();
800         entries.add(newReplicatedLogEntry(1, 4, "four"));
801
802         AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
803
804         follower = createBehavior(context);
805
806         RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
807
808         Assert.assertSame(follower, newBehavior);
809
810         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
811     }
812
813     /**
814      * This test verifies that when InstallSnapshot is received by
815      * the follower its applied correctly.
816      */
817     @Test
818     public void testHandleInstallSnapshot() {
819         logStart("testHandleInstallSnapshot");
820
821         MockRaftActorContext context = createActorContext();
822         context.getTermInformation().update(1, "leader");
823
824         follower = createBehavior(context);
825
826         ByteString bsSnapshot = createSnapshot();
827         int offset = 0;
828         int snapshotLength = bsSnapshot.size();
829         int chunkSize = 50;
830         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
831         int lastIncludedIndex = 1;
832         int chunkIndex = 1;
833         InstallSnapshot lastInstallSnapshot = null;
834
835         for (int i = 0; i < totalChunks; i++) {
836             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
837             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
838                     chunkData, chunkIndex, totalChunks);
839             follower.handleMessage(leaderActor, lastInstallSnapshot);
840             offset = offset + 50;
841             lastIncludedIndex++;
842             chunkIndex++;
843         }
844
845         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
846                 ApplySnapshot.class);
847         Snapshot snapshot = applySnapshot.getSnapshot();
848         assertNotNull(lastInstallSnapshot);
849         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
850         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
851                 snapshot.getLastAppliedTerm());
852         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
853                 snapshot.getLastAppliedIndex());
854         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
855         assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
856         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
857         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
858         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
859         applySnapshot.getCallback().onSuccess();
860
861         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
862                 leaderActor, InstallSnapshotReply.class);
863         assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
864
865         chunkIndex = 1;
866         for (InstallSnapshotReply reply: replies) {
867             assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
868             assertEquals("getTerm", 1, reply.getTerm());
869             assertEquals("isSuccess", true, reply.isSuccess());
870             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
871         }
872
873         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
874     }
875
876     /**
877      * Verify that when an AppendEntries is sent to a follower during a snapshot install
878      * the Follower short-circuits the processing of the AppendEntries message.
879      */
880     @Test
881     public void testReceivingAppendEntriesDuringInstallSnapshot() {
882         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
883
884         MockRaftActorContext context = createActorContext();
885
886         follower = createBehavior(context);
887
888         ByteString bsSnapshot  = createSnapshot();
889         int snapshotLength = bsSnapshot.size();
890         int chunkSize = 50;
891         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
892         int lastIncludedIndex = 1;
893
894         // Check that snapshot installation is not in progress
895         assertNull(follower.getSnapshotTracker());
896
897         // Make sure that we have more than 1 chunk to send
898         assertTrue(totalChunks > 1);
899
900         // Send an install snapshot with the first chunk to start the process of installing a snapshot
901         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
902         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
903                 chunkData, 1, totalChunks));
904
905         // Check if snapshot installation is in progress now
906         assertNotNull(follower.getSnapshotTracker());
907
908         // Send an append entry
909         AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
910                 Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
911
912         follower.handleMessage(leaderActor, appendEntries);
913
914         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
915         assertEquals("isSuccess", true, reply.isSuccess());
916         assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
917         assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
918         assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
919
920         assertNotNull(follower.getSnapshotTracker());
921     }
922
923     @Test
924     public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
925         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
926
927         MockRaftActorContext context = createActorContext();
928
929         follower = createBehavior(context);
930
931         ByteString bsSnapshot  = createSnapshot();
932         int snapshotLength = bsSnapshot.size();
933         int chunkSize = 50;
934         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
935         int lastIncludedIndex = 1;
936
937         // Check that snapshot installation is not in progress
938         assertNull(follower.getSnapshotTracker());
939
940         // Make sure that we have more than 1 chunk to send
941         assertTrue(totalChunks > 1);
942
943         // Send an install snapshot with the first chunk to start the process of installing a snapshot
944         byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
945         follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
946                 chunkData, 1, totalChunks));
947
948         // Check if snapshot installation is in progress now
949         assertNotNull(follower.getSnapshotTracker());
950
951         // Send appendEntries with a new term and leader.
952         AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
953                 Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
954
955         follower.handleMessage(leaderActor, appendEntries);
956
957         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
958         assertEquals("isSuccess", true, reply.isSuccess());
959         assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
960         assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
961         assertEquals("getTerm", 2, reply.getTerm());
962
963         assertNull(follower.getSnapshotTracker());
964     }
965
966     @Test
967     public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
968         logStart("testInitialSyncUpWithHandleInstallSnapshot");
969
970         MockRaftActorContext context = createActorContext();
971         context.setCommitIndex(-1);
972
973         follower = createBehavior(context);
974
975         ByteString bsSnapshot  = createSnapshot();
976         int offset = 0;
977         int snapshotLength = bsSnapshot.size();
978         int chunkSize = 50;
979         int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
980         int lastIncludedIndex = 1;
981         int chunkIndex = 1;
982         InstallSnapshot lastInstallSnapshot = null;
983
984         for (int i = 0; i < totalChunks; i++) {
985             byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
986             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
987                     chunkData, chunkIndex, totalChunks);
988             follower.handleMessage(leaderActor, lastInstallSnapshot);
989             offset = offset + 50;
990             lastIncludedIndex++;
991             chunkIndex++;
992         }
993
994         FollowerInitialSyncUpStatus syncStatus =
995                 MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
996
997         assertFalse(syncStatus.isInitialSyncDone());
998
999         // Clear all the messages
1000         MessageCollectorActor.clearMessages(followerActor);
1001
1002         context.setLastApplied(101);
1003         context.setCommitIndex(101);
1004         setLastLogEntry(context, 1, 101,
1005                 new MockRaftActorContext.MockPayload(""));
1006
1007         List<ReplicatedLogEntry> entries = Arrays.asList(
1008                 newReplicatedLogEntry(2, 101, "foo"));
1009
1010         // The new commitIndex is 101
1011         AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
1012         follower.handleMessage(leaderActor, appendEntries);
1013
1014         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
1015
1016         assertTrue(syncStatus.isInitialSyncDone());
1017     }
1018
1019     @Test
1020     public void testHandleOutOfSequenceInstallSnapshot() {
1021         logStart("testHandleOutOfSequenceInstallSnapshot");
1022
1023         MockRaftActorContext context = createActorContext();
1024
1025         follower = createBehavior(context);
1026
1027         ByteString bsSnapshot = createSnapshot();
1028
1029         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
1030                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
1031         follower.handleMessage(leaderActor, installSnapshot);
1032
1033         InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1034                 InstallSnapshotReply.class);
1035
1036         assertEquals("isSuccess", false, reply.isSuccess());
1037         assertEquals("getChunkIndex", -1, reply.getChunkIndex());
1038         assertEquals("getTerm", 1, reply.getTerm());
1039         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
1040
1041         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
1042     }
1043
1044     @Test
1045     public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
1046         MockRaftActorContext context = createActorContext();
1047
1048         Stopwatch stopwatch = Stopwatch.createStarted();
1049
1050         follower = createBehavior(context);
1051
1052         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1053
1054         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
1055
1056         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
1057
1058         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1059         assertTrue("Expected Candidate", newBehavior instanceof Candidate);
1060     }
1061
1062     @Test
1063     public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
1064         MockRaftActorContext context = createActorContext();
1065         context.setConfigParams(new DefaultConfigParamsImpl() {
1066             @Override
1067             public FiniteDuration getElectionTimeOutInterval() {
1068                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
1069             }
1070         });
1071
1072         context.setRaftPolicy(createRaftPolicy(false, false));
1073
1074         follower = createBehavior(context);
1075
1076         TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
1077         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
1078         assertSame("handleMessage result", follower, newBehavior);
1079     }
1080
1081     @Test
1082     public void testFollowerSchedulesElectionIfNonVoting() {
1083         MockRaftActorContext context = createActorContext();
1084         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
1085         ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
1086                 FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
1087         ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
1088
1089         follower = new Follower(context, "leader", (short)1);
1090
1091         ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
1092                 ElectionTimeout.class);
1093         RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
1094         assertSame("handleMessage result", follower, newBehavior);
1095         assertNull("Expected null leaderId", follower.getLeaderId());
1096     }
1097
1098     @Test
1099     public void testElectionScheduledWhenAnyRaftRPCReceived() {
1100         MockRaftActorContext context = createActorContext();
1101         follower = createBehavior(context);
1102         follower.handleMessage(leaderActor, new RaftRPC() {
1103             private static final long serialVersionUID = 1L;
1104
1105             @Override
1106             public long getTerm() {
1107                 return 100;
1108             }
1109         });
1110         verify(follower).scheduleElection(any(FiniteDuration.class));
1111     }
1112
1113     @Test
1114     public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
1115         MockRaftActorContext context = createActorContext();
1116         follower = createBehavior(context);
1117         follower.handleMessage(leaderActor, "non-raft-rpc");
1118         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
1119     }
1120
1121     @Test
1122     public void testCaptureSnapshotOnLastEntryInAppendEntries() {
1123         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
1124         logStart(id);
1125
1126         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1127
1128         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1129         config.setSnapshotBatchCount(2);
1130         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1131
1132         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1133         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1134         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1135                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1136         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1137                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1138         followerRaftActor.set(followerActorRef.underlyingActor());
1139         followerRaftActor.get().waitForInitializeBehaviorComplete();
1140
1141         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1142         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1143         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1144
1145         List<ReplicatedLogEntry> entries = Arrays.asList(
1146                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
1147
1148         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
1149
1150         followerActorRef.tell(appendEntries, leaderActor);
1151
1152         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1153         assertEquals("isSuccess", true, reply.isSuccess());
1154
1155         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1156
1157         InMemoryJournal.waitForDeleteMessagesComplete(id);
1158         InMemoryJournal.waitForWriteMessagesComplete(id);
1159         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
1160         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1161         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
1162         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1163         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1164         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1165         assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1166
1167         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1168         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1169         assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
1170         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1171         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
1172         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
1173                 MockRaftActor.fromState(snapshot.getState()));
1174     }
1175
1176     @Test
1177     public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
1178         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
1179         logStart(id);
1180
1181         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1182
1183         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1184         config.setSnapshotBatchCount(2);
1185         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1186
1187         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1188         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1189         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1190                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1191         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1192                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1193         followerRaftActor.set(followerActorRef.underlyingActor());
1194         followerRaftActor.get().waitForInitializeBehaviorComplete();
1195
1196         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1197         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1198         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1199
1200         List<ReplicatedLogEntry> entries = Arrays.asList(
1201                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1202                 newReplicatedLogEntry(1, 2, "three"));
1203
1204         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
1205
1206         followerActorRef.tell(appendEntries, leaderActor);
1207
1208         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1209         assertEquals("isSuccess", true, reply.isSuccess());
1210
1211         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1212
1213         InMemoryJournal.waitForDeleteMessagesComplete(id);
1214         InMemoryJournal.waitForWriteMessagesComplete(id);
1215         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
1216         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1217         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
1218         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1219         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1220         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1221         assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1222
1223         assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
1224         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1225         assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
1226         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1227         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1228         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1229                 entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
1230
1231         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1232         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
1233
1234         // Reinstate the actor from persistence
1235
1236         actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
1237
1238         followerActorRef = actorFactory.createTestActor(builder.props()
1239                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1240         followerRaftActor.set(followerActorRef.underlyingActor());
1241         followerRaftActor.get().waitForInitializeBehaviorComplete();
1242
1243         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
1244         assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
1245         assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
1246         assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
1247         assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
1248                 entries.get(2).getData()), followerRaftActor.get().getState());
1249     }
1250
1251     @Test
1252     public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
1253         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
1254         logStart(id);
1255
1256         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
1257
1258         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1259         config.setSnapshotBatchCount(1);
1260         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
1261
1262         final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
1263         RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
1264         Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
1265                 .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
1266         TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
1267                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
1268         followerRaftActor.set(followerActorRef.underlyingActor());
1269         followerRaftActor.get().waitForInitializeBehaviorComplete();
1270
1271         InMemorySnapshotStore.addSnapshotSavedLatch(id);
1272         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
1273         InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
1274
1275         List<ReplicatedLogEntry> entries = Arrays.asList(
1276                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
1277                 newReplicatedLogEntry(1, 2, "three"));
1278
1279         AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
1280
1281         followerActorRef.tell(appendEntries, leaderActor);
1282
1283         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1284         assertEquals("isSuccess", true, reply.isSuccess());
1285
1286         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
1287
1288         InMemoryJournal.waitForDeleteMessagesComplete(id);
1289         InMemoryJournal.waitForWriteMessagesComplete(id);
1290         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
1291         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
1292         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
1293         List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
1294         assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
1295         assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
1296         assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
1297
1298         assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
1299         assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
1300         assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
1301         assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
1302         assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
1303         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
1304         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
1305         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
1306                 MockRaftActor.fromState(snapshot.getState()));
1307     }
1308
1309     @Test
1310     public void testNeedsLeaderAddress() {
1311         logStart("testNeedsLeaderAddress");
1312
1313         MockRaftActorContext context = createActorContext();
1314         context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
1315         context.addToPeers("leader", null, VotingState.VOTING);
1316         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
1317
1318         follower = createBehavior(context);
1319
1320         follower.handleMessage(leaderActor,
1321                 new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
1322
1323         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1324         assertTrue(reply.isNeedsLeaderAddress());
1325         MessageCollectorActor.clearMessages(leaderActor);
1326
1327         PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
1328         ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
1329
1330         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
1331                 (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
1332
1333         reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
1334         assertFalse(reply.isNeedsLeaderAddress());
1335
1336         verify(mockResolver).setResolved("leader", leaderActor.path().toString());
1337     }
1338
1339     @SuppressWarnings("checkstyle:IllegalCatch")
1340     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
1341             final AtomicReference<MockRaftActor> followerRaftActor) {
1342         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
1343             @Override
1344             public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
1345                 try {
1346                     actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
1347                             installSnapshotStream), actorRef);
1348                 } catch (RuntimeException e) {
1349                     throw e;
1350                 } catch (Exception e) {
1351                     throw new RuntimeException(e);
1352                 }
1353             }
1354
1355             @Override
1356             public void applySnapshot(final State snapshotState) {
1357             }
1358
1359             @Override
1360             public State deserializeSnapshot(final ByteSource snapshotBytes) {
1361                 throw new UnsupportedOperationException();
1362             }
1363         };
1364         return snapshotCohort;
1365     }
1366
1367     public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
1368         int snapshotLength = bs.size();
1369         int start = offset;
1370         int size = chunkSize;
1371         if (chunkSize > snapshotLength) {
1372             size = snapshotLength;
1373         } else {
1374             if (start + chunkSize > snapshotLength) {
1375                 size = snapshotLength - start;
1376             }
1377         }
1378
1379         byte[] nextChunk = new byte[size];
1380         bs.copyTo(nextChunk, start, 0, size);
1381         return nextChunk;
1382     }
1383
1384     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1385             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
1386         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
1387     }
1388
1389     private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
1390             final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
1391             final boolean expForceInstallSnapshot) {
1392
1393         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
1394                 AppendEntriesReply.class);
1395
1396         assertEquals("isSuccess", expSuccess, reply.isSuccess());
1397         assertEquals("getTerm", expTerm, reply.getTerm());
1398         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
1399         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
1400         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
1401         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
1402         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
1403         assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
1404     }
1405
1406
1407     private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
1408         return new SimpleReplicatedLogEntry(index, term,
1409                 new MockRaftActorContext.MockPayload(data));
1410     }
1411
1412     private ByteString createSnapshot() {
1413         HashMap<String, String> followerSnapshot = new HashMap<>();
1414         followerSnapshot.put("1", "A");
1415         followerSnapshot.put("2", "B");
1416         followerSnapshot.put("3", "C");
1417
1418         return toByteString(followerSnapshot);
1419     }
1420
1421     @Override
1422     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
1423             final ActorRef actorRef, final RaftRPC rpc) {
1424         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
1425
1426         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
1427         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
1428     }
1429
1430     @Override
1431     protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
1432         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
1433         assertEquals("isSuccess", true, reply.isSuccess());
1434     }
1435 }